Jackie-Jiang commented on code in PR #9203:
URL: https://github.com/apache/pinot/pull/9203#discussion_r946254317
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -507,14 +507,12 @@ public SuccessResponse resetSegment(
@ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType")
String tableNameWithType,
@ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
- @ApiParam(value = "Maximum time in milliseconds to wait for reset to be
completed. By default, uses "
- + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long
maxWaitTimeMs) {
+ @ApiParam(value = "Name of the target instance to reset")
@QueryParam("targetInstance") String targetInstance) {
Review Comment:
Annotate it as `@Nullable`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2369,143 +2370,137 @@ public void resetSegment(String tableNameWithType,
String segmentName, long exte
"Could not find segment: %s in ideal state for table: %s",
segmentName, tableNameWithType);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
- // First, disable or reset the segment
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- LOGGER.info("Disabling segment: {} of table: {}", segmentName,
tableNameWithType);
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will
be left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
- } else {
- LOGGER.info("Resetting segment: {} of table: {}", segmentName,
tableNameWithType);
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
- }
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segment: {} of table: {}",
- externalViewWaitTimeMs, segmentName, tableNameWithType);
- long startTime = System.currentTimeMillis();
- Set<String> instancesToCheck = new HashSet<>(instanceSet);
- while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
- if (newExternalViewStateMap == null) {
- continue;
+ if (targetInstance == null || targetInstance.equals(instance)) {
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Skipping reset for segment: {} of table: {} on
instance: {}", segmentName, tableNameWithType,
+ instance);
+ } else {
+ LOGGER.info("Resetting segment: {} of table: {} on instance: {}",
segmentName, tableNameWithType, instance);
+ resetPartitionAllState(instance, tableNameWithType,
Collections.singleton(segmentName));
+ }
}
- instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
- Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
- }
- if (!instancesToCheck.isEmpty()) {
- throw new TimeoutException(String.format(
- "Timed out waiting for external view to stabilize after call to
disable/reset segment: %s of table: %s. "
- + "Disable/reset might complete in the background, but skipping
enable of segment.", segmentName,
- tableNameWithType));
- }
-
- // Lastly, enable segment
- LOGGER.info("Enabling segment: {} of table: {}", segmentName,
tableNameWithType);
- for (String instance : instanceSet) {
- _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
}
}
/**
- * Resets all segments of a table. The steps involved are
- * 1. If segment is in ERROR state in the External View, invoke
resetPartition, else invoke disablePartition
- * 2. Wait for the external view to stabilize. Step 1 should turn all
segments to OFFLINE state
- * 3. Invoke enablePartition on the segments
+ * Resets all segments of a table. This operation invoke resetPartition via
state transition message.
*/
- public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs)
+ public void resetAllSegments(String tableNameWithType, @Nullable String
targetInstance)
throws InterruptedException, TimeoutException {
IdealState idealState = getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
ExternalView externalView = getTableExternalView(tableNameWithType);
Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
- Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
- Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+ Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>();
for (String segmentName : idealState.getPartitionSet()) {
Set<String> instanceSet = idealState.getInstanceSet(segmentName);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
} else {
instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
}
}
- segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
}
- // First, disable/reset the segments
- LOGGER.info("Disabling/resetting segments of table: {}",
tableNameWithType);
+ LOGGER.info("Resetting segments of table: {}", tableNameWithType);
for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
- for (Map.Entry<String, Set<String>> entry :
instanceToDisableSegmentsMap.entrySet()) {
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will be
left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segments of table: {}",
- externalViewWaitTimeMs, tableNameWithType);
- long startTime = System.currentTimeMillis();
- while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Iterator<Map.Entry<String, Set<String>>> iterator =
segmentInstancesToCheck.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, Set<String>> entryToCheck = iterator.next();
- String segmentToCheck = entryToCheck.getKey();
- Set<String> instancesToCheck = entryToCheck.getValue();
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentToCheck);
- if (newExternalViewStateMap == null) {
- continue;
- }
- boolean allOffline = true;
- for (String instance : instancesToCheck) {
- if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
- allOffline = false;
- break;
- }
- }
- if (allOffline) {
- iterator.remove();
- }
+ if (targetInstance == null || targetInstance.equals(entry.getKey())) {
Review Comment:
When `targetInstance` is provided, the current logic is very inefficient
because it tracks all the other instances. Also, the
`instanceToSkippedSegmentsMap` won't really be relevant. Suggest adding a
separate code path for that.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2369,143 +2370,137 @@ public void resetSegment(String tableNameWithType,
String segmentName, long exte
"Could not find segment: %s in ideal state for table: %s",
segmentName, tableNameWithType);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
- // First, disable or reset the segment
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- LOGGER.info("Disabling segment: {} of table: {}", segmentName,
tableNameWithType);
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will
be left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
- } else {
- LOGGER.info("Resetting segment: {} of table: {}", segmentName,
tableNameWithType);
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
- }
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segment: {} of table: {}",
- externalViewWaitTimeMs, segmentName, tableNameWithType);
- long startTime = System.currentTimeMillis();
- Set<String> instancesToCheck = new HashSet<>(instanceSet);
- while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
- if (newExternalViewStateMap == null) {
- continue;
+ if (targetInstance == null || targetInstance.equals(instance)) {
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Skipping reset for segment: {} of table: {} on
instance: {}", segmentName, tableNameWithType,
+ instance);
+ } else {
+ LOGGER.info("Resetting segment: {} of table: {} on instance: {}",
segmentName, tableNameWithType, instance);
+ resetPartitionAllState(instance, tableNameWithType,
Collections.singleton(segmentName));
+ }
}
- instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
- Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
- }
- if (!instancesToCheck.isEmpty()) {
- throw new TimeoutException(String.format(
- "Timed out waiting for external view to stabilize after call to
disable/reset segment: %s of table: %s. "
- + "Disable/reset might complete in the background, but skipping
enable of segment.", segmentName,
- tableNameWithType));
- }
-
- // Lastly, enable segment
- LOGGER.info("Enabling segment: {} of table: {}", segmentName,
tableNameWithType);
- for (String instance : instanceSet) {
- _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
}
}
/**
- * Resets all segments of a table. The steps involved are
- * 1. If segment is in ERROR state in the External View, invoke
resetPartition, else invoke disablePartition
- * 2. Wait for the external view to stabilize. Step 1 should turn all
segments to OFFLINE state
- * 3. Invoke enablePartition on the segments
+ * Resets all segments of a table. This operation invoke resetPartition via
state transition message.
*/
- public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs)
+ public void resetAllSegments(String tableNameWithType, @Nullable String
targetInstance)
throws InterruptedException, TimeoutException {
IdealState idealState = getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
ExternalView externalView = getTableExternalView(tableNameWithType);
Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
- Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
- Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+ Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>();
for (String segmentName : idealState.getPartitionSet()) {
Set<String> instanceSet = idealState.getInstanceSet(segmentName);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
} else {
instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
}
}
- segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
}
- // First, disable/reset the segments
- LOGGER.info("Disabling/resetting segments of table: {}",
tableNameWithType);
+ LOGGER.info("Resetting segments of table: {}", tableNameWithType);
for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
- for (Map.Entry<String, Set<String>> entry :
instanceToDisableSegmentsMap.entrySet()) {
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will be
left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segments of table: {}",
- externalViewWaitTimeMs, tableNameWithType);
- long startTime = System.currentTimeMillis();
- while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Iterator<Map.Entry<String, Set<String>>> iterator =
segmentInstancesToCheck.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, Set<String>> entryToCheck = iterator.next();
- String segmentToCheck = entryToCheck.getKey();
- Set<String> instancesToCheck = entryToCheck.getValue();
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentToCheck);
- if (newExternalViewStateMap == null) {
- continue;
- }
- boolean allOffline = true;
- for (String instance : instancesToCheck) {
- if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
- allOffline = false;
- break;
- }
- }
- if (allOffline) {
- iterator.remove();
- }
+ if (targetInstance == null || targetInstance.equals(entry.getKey())) {
+ resetPartitionAllState(entry.getKey(), tableNameWithType,
+ entry.getValue());
}
- Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
}
- if (!segmentInstancesToCheck.isEmpty()) {
- throw new TimeoutException(String.format(
- "Timed out waiting for external view to stabilize after call to
disable/reset segments. "
- + "Disable/reset might complete in the background, but skipping
enable of segments of table: %s",
- tableNameWithType));
+
+ LOGGER.info("Reset segments for table {} finished. WIth the following
segments skipped: {}", tableNameWithType,
+ instanceToSkippedSegmentsMap);
+ }
+
+ /**
+ * This util is similar to {@link HelixAdmin#resetPartition(String, String,
String, List)}.
+ * However instead of resetting only the ERROR state to its initial state.
we reset all state regardless.
+ */
+ private void resetPartitionAllState(String instanceName, String resourceName,
+ Set<String> resetPartitionNames) {
+ LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
+ resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName,
+ instanceName, _helixClusterName);
+ HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // check the instance is alive
+ LiveInstance liveInstance =
accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ // check if the instance exists in the cluster
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ throw new RuntimeException(String.format("Can't find instance: %s on
%s", instanceName, instanceConfigPath));
}
- // Lastly, enable segments
- LOGGER.info("Enabling segments of table: {}", tableNameWithType);
- for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
- _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
+ // gather metadata for sending state transition message.
+ // we skip through the sanity checks normally done on Helix because in
Pinot these are guaranteed to be safe.
+ IdealState idealState =
accessor.getProperty(keyBuilder.idealStates(resourceName));
Review Comment:
We don't need to read ideal-state because everything extracted from the
ideal state is static for pinot.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2369,143 +2370,137 @@ public void resetSegment(String tableNameWithType,
String segmentName, long exte
"Could not find segment: %s in ideal state for table: %s",
segmentName, tableNameWithType);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
- // First, disable or reset the segment
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- LOGGER.info("Disabling segment: {} of table: {}", segmentName,
tableNameWithType);
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will
be left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
- } else {
- LOGGER.info("Resetting segment: {} of table: {}", segmentName,
tableNameWithType);
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
- }
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segment: {} of table: {}",
- externalViewWaitTimeMs, segmentName, tableNameWithType);
- long startTime = System.currentTimeMillis();
- Set<String> instancesToCheck = new HashSet<>(instanceSet);
- while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
- if (newExternalViewStateMap == null) {
- continue;
+ if (targetInstance == null || targetInstance.equals(instance)) {
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Skipping reset for segment: {} of table: {} on
instance: {}", segmentName, tableNameWithType,
+ instance);
+ } else {
+ LOGGER.info("Resetting segment: {} of table: {} on instance: {}",
segmentName, tableNameWithType, instance);
+ resetPartitionAllState(instance, tableNameWithType,
Collections.singleton(segmentName));
+ }
}
- instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
- Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
- }
- if (!instancesToCheck.isEmpty()) {
- throw new TimeoutException(String.format(
- "Timed out waiting for external view to stabilize after call to
disable/reset segment: %s of table: %s. "
- + "Disable/reset might complete in the background, but skipping
enable of segment.", segmentName,
- tableNameWithType));
- }
-
- // Lastly, enable segment
- LOGGER.info("Enabling segment: {} of table: {}", segmentName,
tableNameWithType);
- for (String instance : instanceSet) {
- _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType,
- Lists.newArrayList(segmentName));
}
}
/**
- * Resets all segments of a table. The steps involved are
- * 1. If segment is in ERROR state in the External View, invoke
resetPartition, else invoke disablePartition
- * 2. Wait for the external view to stabilize. Step 1 should turn all
segments to OFFLINE state
- * 3. Invoke enablePartition on the segments
+ * Resets all segments of a table. This operation invoke resetPartition via
state transition message.
*/
- public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs)
+ public void resetAllSegments(String tableNameWithType, @Nullable String
targetInstance)
throws InterruptedException, TimeoutException {
IdealState idealState = getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
ExternalView externalView = getTableExternalView(tableNameWithType);
Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
- Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
- Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+ Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>();
for (String segmentName : idealState.getPartitionSet()) {
Set<String> instanceSet = idealState.getInstanceSet(segmentName);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
for (String instance : instanceSet) {
- if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
- instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
+ if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+ instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
} else {
instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
}
}
- segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
}
- // First, disable/reset the segments
- LOGGER.info("Disabling/resetting segments of table: {}",
tableNameWithType);
+ LOGGER.info("Resetting segments of table: {}", tableNameWithType);
for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
- // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
- _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
- for (Map.Entry<String, Set<String>> entry :
instanceToDisableSegmentsMap.entrySet()) {
- // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
- // TODO: If the controller fails to re-enable the partition, it will be
left in disabled state
- _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType,
- Lists.newArrayList(entry.getValue()));
- }
-
- // Wait for external view to stabilize
- LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segments of table: {}",
- externalViewWaitTimeMs, tableNameWithType);
- long startTime = System.currentTimeMillis();
- while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
- ExternalView newExternalView = getTableExternalView(tableNameWithType);
- Preconditions.checkState(newExternalView != null, "Could not find
external view for table: %s",
- tableNameWithType);
- Iterator<Map.Entry<String, Set<String>>> iterator =
segmentInstancesToCheck.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, Set<String>> entryToCheck = iterator.next();
- String segmentToCheck = entryToCheck.getKey();
- Set<String> instancesToCheck = entryToCheck.getValue();
- Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentToCheck);
- if (newExternalViewStateMap == null) {
- continue;
- }
- boolean allOffline = true;
- for (String instance : instancesToCheck) {
- if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
- allOffline = false;
- break;
- }
- }
- if (allOffline) {
- iterator.remove();
- }
+ if (targetInstance == null || targetInstance.equals(entry.getKey())) {
+ resetPartitionAllState(entry.getKey(), tableNameWithType,
+ entry.getValue());
}
- Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
}
- if (!segmentInstancesToCheck.isEmpty()) {
- throw new TimeoutException(String.format(
- "Timed out waiting for external view to stabilize after call to
disable/reset segments. "
- + "Disable/reset might complete in the background, but skipping
enable of segments of table: %s",
- tableNameWithType));
+
+ LOGGER.info("Reset segments for table {} finished. WIth the following
segments skipped: {}", tableNameWithType,
Review Comment:
```suggestion
LOGGER.info("Reset segments for table {} finished. With the following
segments skipped: {}", tableNameWithType,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]