mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548076840
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
Review comment:
Can we keep the log messages consistent? Let us call it a segment
instead of partition. (please check other log messages as well)
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
Review comment:
should be 4xx error (unless pinot messed up real bad. :-))
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partition: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ }
Review comment:
Please add a thread.sleep here instead of a busy-wait loop. Suggestion:
`
Thread.sleep(min(100,maxWaitTimeMillis/10))
`
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
}
}
+ /**
+ * Resets the segment of the table, by disabling and then enabling it.
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/{segmentName}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets a segment by first disabling it, waiting for
external view to stabilize, and finally enabling it again",
+ notes = "Resets a segment by disabling and then enabling a segment")
+ public SuccessResponse resetSegment(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @ApiParam(value = "Time in millis to wait for external view to
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+ segmentName = URIUtils.decode(segmentName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ try {
+ Preconditions.checkState(tableType != null, "Must provide table name
with type: %s", tableNameWithType);
+ _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+ externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+ : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ return new SuccessResponse("Successfully invoked segment reset");
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to reset segment: %s in table: %s. %s",
segmentName, tableNameWithType, e.getMessage()),
+ Status.NOT_FOUND);
+ }
+ }
+
+ /**
+ * Resets all segments of the given table
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets all segments of the table, by first disabling
them, waiting for external view to stabilize, and finally enabling the
segments",
+ notes = "Resets a segment by disabling and then enabling a segment")
+ public SuccessResponse resetAllSegments(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Time in millis to wait for external view to
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ try {
+ Preconditions.checkState(tableType != null, "Must provide table name
with type: %s", tableNameWithType);
+ _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+ : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ return new SuccessResponse("Successfully invoked segment reset");
Review comment:
better to include the table name in the message. Also, you may want to
word it such that it clearly implies that the reset is completed.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
}
}
+ /**
+ * Resets the segment of the table, by disabling and then enabling it.
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/{segmentName}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets a segment by first disabling it, waiting for
external view to stabilize, and finally enabling it again",
+ notes = "Resets a segment by disabling and then enabling a segment")
+ public SuccessResponse resetSegment(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @ApiParam(value = "Time in millis to wait for external view to
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
Review comment:
All that this parameter does is to override the admin command wait time.
Why not call it something like that? We can then add it to any admin command
now or later with the same name.
Suggested:
```suggestion
@ApiParam(value = "Maximum time in milliseconds to wait for reset to
be completed") @QueryParam("maxWaitTimeMs") long externalViewWaitTimeMs) {
```
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
Review comment:
Dont you have to enable the partition (segment) again after this call?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partition: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ }
+ if (!instancesToCheck.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Timed out waiting for external view to stabilize after
disable/reset call. Skipping enable of partition: %s of table: %s",
+ segmentName, tableNameWithType));
+ }
+
+ // Enable partition
+ LOGGER.info("Enabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ for (String instance : instanceSet) {
+ _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ /**
+ * Resets all segments of a table by disabling and then enabling the segments
+ */
+ public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+
+ Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
Review comment:
suggest naming the variables with `segments` instead of `partritions`.
We do have two other semantics of partitions that is already confusing (stream
partitions, and partitioning of data in segment assignment)
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partition: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ }
+ if (!instancesToCheck.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Timed out waiting for external view to stabilize after
disable/reset call. Skipping enable of partition: %s of table: %s",
+ segmentName, tableNameWithType));
+ }
+
+ // Enable partition
+ LOGGER.info("Enabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ for (String instance : instanceSet) {
+ _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ /**
+ * Resets all segments of a table by disabling and then enabling the segments
+ */
+ public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+
+ Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+ for (String partition : idealState.getPartitionSet()) {
+ Set<String> instanceSet = idealState.getInstanceSet(partition);
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(partition);
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ } else {
+ disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ }
+ }
+ partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+ }
+
+ // First, disable/reset the partitions
+ LOGGER.info("Disabling/resetting partitions of table: {}",
tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
resetInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+ for (Map.Entry<String, Set<String>> entry :
disableInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partitions of table: {}",
+ externalViewWaitTimeMs, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis()
- startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
Review comment:
Not sure if we can start off with no external view at thsi point, since
helix will still be processing the reset calls.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
}
}
+ /**
+ * Resets the segment of the table, by disabling and then enabling it.
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/{segmentName}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets a segment by first disabling it, waiting for
external view to stabilize, and finally enabling it again",
+ notes = "Resets a segment by disabling and then enabling a segment")
+ public SuccessResponse resetSegment(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @ApiParam(value = "Time in millis to wait for external view to
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+ segmentName = URIUtils.decode(segmentName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ try {
+ Preconditions.checkState(tableType != null, "Must provide table name
with type: %s", tableNameWithType);
Review comment:
Please make sure that this error message shows up on the console or curl
command if table type is not given. Sometimes we see that the precondition
check error message does not show up, and we get a 5xx error (this should be a
4xx error)
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
Review comment:
should be a 4xx error
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partition: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ }
+ if (!instancesToCheck.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Timed out waiting for external view to stabilize after
disable/reset call. Skipping enable of partition: %s of table: %s",
+ segmentName, tableNameWithType));
+ }
+
+ // Enable partition
+ LOGGER.info("Enabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ for (String instance : instanceSet) {
+ _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ /**
+ * Resets all segments of a table by disabling and then enabling the segments
+ */
+ public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+
+ Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+ for (String partition : idealState.getPartitionSet()) {
+ Set<String> instanceSet = idealState.getInstanceSet(partition);
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(partition);
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ } else {
+ disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ }
+ }
+ partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+ }
+
+ // First, disable/reset the partitions
+ LOGGER.info("Disabling/resetting partitions of table: {}",
tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
resetInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+ for (Map.Entry<String, Set<String>> entry :
disableInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partitions of table: {}",
+ externalViewWaitTimeMs, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis()
- startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Iterator<Map.Entry<String, Set<String>>> iterator =
partitionInstancesToCheck.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+ String partitionToCheck = entryToCheck.getKey();
+ Set<String> instancesToCheck = entryToCheck.getValue();
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(partitionToCheck);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ boolean allOffline = true;
+ for (String instance : instancesToCheck) {
+ if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+ allOffline = false;
+ break;
+ }
+ }
+ if (allOffline) {
+ iterator.remove();
+ }
+ }
+ }
+ if (!partitionInstancesToCheck.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Timed out waiting for external view to stabilize after
disable/reset call. Skipping enable of segments of table: %s",
+ tableNameWithType));
+ }
+
+ // Enable partitions
+ LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
resetInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+ for (Map.Entry<String, Set<String>> entry :
disableInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(),
tableNameWithType, partitions);
Review comment:
Why do we have an enable here and one in line 1869? Can you clarify
again if helix expects two of these in the reset api ?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType,
String segmentName) {
return numMessagesSent;
}
+ /**
+ * Resets a segment by disabling and then enabling the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> partitions = Lists.newArrayList(segmentName);
+
+ // First, disable or reset partition
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Resetting partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, partitions);
+ } else {
+ LOGGER.info("Disabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ _helixAdmin.enablePartition(false, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partition: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ }
+ if (!instancesToCheck.isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Timed out waiting for external view to stabilize after
disable/reset call. Skipping enable of partition: %s of table: %s",
+ segmentName, tableNameWithType));
+ }
+
+ // Enable partition
+ LOGGER.info("Enabling partition: {} of table: {}", segmentName,
tableNameWithType);
+ for (String instance : instanceSet) {
+ _helixAdmin.enablePartition(true, _helixClusterName, instance,
tableNameWithType, partitions);
+ }
+ }
+
+ /**
+ * Resets all segments of a table by disabling and then enabling the segments
+ */
+ public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs) {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+
+ Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+ Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+ for (String partition : idealState.getPartitionSet()) {
+ Set<String> instanceSet = idealState.getInstanceSet(partition);
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(partition);
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ } else {
+ disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(partition);
+ }
+ }
+ partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+ }
+
+ // First, disable/reset the partitions
+ LOGGER.info("Disabling/resetting partitions of table: {}",
tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
resetInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+ for (Map.Entry<String, Set<String>> entry :
disableInstanceToPartitionsMap.entrySet()) {
+ ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+ _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType, partitions);
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of partitions of table: {}",
+ externalViewWaitTimeMs, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis()
- startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Iterator<Map.Entry<String, Set<String>>> iterator =
partitionInstancesToCheck.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+ String partitionToCheck = entryToCheck.getKey();
+ Set<String> instancesToCheck = entryToCheck.getValue();
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(partitionToCheck);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ boolean allOffline = true;
+ for (String instance : instancesToCheck) {
+ if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+ allOffline = false;
+ break;
+ }
+ }
+ if (allOffline) {
+ iterator.remove();
+ }
+ }
+ }
Review comment:
Please add a sleep here like in the other case, instead of busy-waiting.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]