This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37f2e28 Segment reset API (#6336)
37f2e28 is described below
commit 37f2e28a37df53f14daf4ce9205a71f9318dce1e
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Dec 30 13:47:40 2020 -0800
Segment reset API (#6336)
Adding a reset API. This API will disable and then enable the segment. This
API will be useful in case of resetting consumers which are stuck as reported
in #6308.
* If the segment is in ERROR state, invoking this API will send state
transitions first to OFFLINE, wait for EV to stabilize, and then back to
ONLINE/CONSUMING.
* If segment is ONLINE/CONSUMING, invoking this API will send state
transitions, first to OFFLINE, wait for EV to stabilize, and then back to
ONLINE/CONSUMING.
---
.../api/resources/PinotSegmentRestletResource.java | 65 ++++++++-
.../helix/core/PinotHelixResourceManager.java | 155 +++++++++++++++++++++
2 files changed, 218 insertions(+), 2 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index b62ce61..0078c54 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -32,11 +33,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -63,7 +64,6 @@ import
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.util.TableMetadataReader;
-import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -355,6 +355,67 @@ public class PinotSegmentRestletResource {
}
}
+ /**
+ * Resets the segment of the table, by disabling and then enabling it.
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/{segmentName}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets a segment by first disabling it, waiting for
external view to stabilize, and finally enabling it again", notes = "Resets a
segment by disabling and then enabling the segment")
+ public SuccessResponse resetSegment(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @ApiParam(value = "Maximum time in milliseconds to wait for reset to be
completed. By default, uses serverAdminRequestTimeout")
@QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+ segmentName = URIUtils.decode(segmentName);
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ try {
+ Preconditions.checkState(tableType != null, "Must provide table name
with type: %s", tableNameWithType);
+ _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+ maxWaitTimeMs > 0 ? maxWaitTimeMs :
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ return new SuccessResponse(
+ String.format("Successfully reset segment: %s of table: %s",
segmentName, tableNameWithType));
+ } catch (IllegalStateException e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to reset segments in table: %s. %s",
tableNameWithType, e.getMessage()),
+ Status.NOT_FOUND);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to reset segment: %s of table: %s. %s",
segmentName, tableNameWithType, e.getMessage()),
+ Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ /**
+ * Resets all segments of the given table
+ * This API will take segments to OFFLINE state, wait for External View to
stabilize, and then back to ONLINE/CONSUMING state,
+ * thus effective in resetting segments or consumers in error states.
+ */
+ @POST
+ @Path("segments/{tableNameWithType}/reset")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Resets all segments of the table, by first disabling
them, waiting for external view to stabilize, and finally enabling the
segments", notes = "Resets a segment by disabling and then enabling a segment")
+ public SuccessResponse resetAllSegments(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Maximum time in milliseconds to wait for reset to be
completed. By default, uses serverAdminRequestTimeout")
@QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ try {
+ Preconditions.checkState(tableType != null, "Must provide table name
with type: %s", tableNameWithType);
+ _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
+ maxWaitTimeMs > 0 ? maxWaitTimeMs :
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ return new SuccessResponse(String.format("Successfully reset all
segments of table: %s", tableNameWithType));
+ } catch (IllegalStateException e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to reset segments in table: %s. %s",
tableNameWithType, e.getMessage()),
+ Status.NOT_FOUND);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to reset segments in table: %s. %s",
tableNameWithType, e.getMessage()),
+ Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
@Deprecated
@POST
@Path("tables/{tableName}/segments/{segmentName}/reload")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 6b5168f..6eb6ebe 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,6 +25,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
@@ -1778,6 +1780,159 @@ public class PinotHelixResourceManager {
}
/**
+ * Resets a segment. The steps involved are
+ * 1. If segment is in ERROR state in the External View, invoke
resetPartition, else invoke disablePartition
+ * 2. Wait for the external view to stabilize. Step 1 should turn the
segment to OFFLINE state
+ * 3. Invoke enablePartition on the segment
+ */
+ public void resetSegment(String tableNameWithType, String segmentName, long
externalViewWaitTimeMs)
+ throws InterruptedException, TimeoutException {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Preconditions
+ .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find
segment: %s in ideal state for table: %s");
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+
+ // First, disable or reset the segment
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ LOGGER.info("Disabling segment: {} of table: {}", segmentName,
tableNameWithType);
+ // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
+ _helixAdmin
+ .enablePartition(false, _helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
+ } else {
+ LOGGER.info("Resetting segment: {} of table: {}", segmentName,
tableNameWithType);
+ // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
+ _helixAdmin.resetPartition(_helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
+ }
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segment: {} of table: {}",
+ externalViewWaitTimeMs, segmentName, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ Set<String> instancesToCheck = new HashSet<>(instanceSet);
+ while (!instancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentName);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ instancesToCheck.removeIf(instance ->
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+ Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+ }
+ if (!instancesToCheck.isEmpty()) {
+ throw new TimeoutException(String.format(
+ "Timed out waiting for external view to stabilize after call to
disable/reset segment: %s of table: %s. "
+ + "Disable/reset might complete in the background, but skipping
enable of segment.", segmentName,
+ tableNameWithType));
+ }
+
+ // Lastly, enable segment
+ LOGGER.info("Enabling segment: {} of table: {}", segmentName,
tableNameWithType);
+ for (String instance : instanceSet) {
+ _helixAdmin
+ .enablePartition(true, _helixClusterName, instance,
tableNameWithType, Lists.newArrayList(segmentName));
+ }
+ }
+
+ /**
+ * Resets all segments of a table. The steps involved are
+ * 1. If segment is in ERROR state in the External View, invoke
resetPartition, else invoke disablePartition
+ * 2. Wait for the external view to stabilize. Step 1 should turn all
segments to OFFLINE state
+ * 3. Invoke enablePartition on the segments
+ */
+ public void resetAllSegments(String tableNameWithType, long
externalViewWaitTimeMs)
+ throws InterruptedException, TimeoutException {
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Could not find ideal state
for table: %s", tableNameWithType);
+ ExternalView externalView = getTableExternalView(tableNameWithType);
+ Preconditions.checkState(externalView != null, "Could not find external
view for table: %s", tableNameWithType);
+
+ Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
+ Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
+ Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+
+ for (String segmentName : idealState.getPartitionSet()) {
+ Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+ Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ for (String instance : instanceSet) {
+ if (externalViewStateMap == null ||
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+ instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
+ } else {
+ instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new
HashSet<>()).add(segmentName);
+ }
+ }
+ segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
+ }
+
+ // First, disable/reset the segments
+ LOGGER.info("Disabling/resetting segments of table: {}",
tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
+ // resetPartition takes a segment which is in ERROR state, to OFFLINE
state
+ _helixAdmin
+ .resetPartition(_helixClusterName, entry.getKey(),
tableNameWithType, Lists.newArrayList(entry.getValue()));
+ } for (Map.Entry<String, Set<String>> entry :
instanceToDisableSegmentsMap.entrySet()) {
+ // enablePartition takes a segment which is NOT in ERROR state, to
OFFLINE state
+ _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(),
tableNameWithType,
+ Lists.newArrayList(entry.getValue()));
+ }
+
+ // Wait for external view to stabilize
+ LOGGER.info("Waiting {} ms for external view to stabilize after
disable/reset of segments of table: {}",
+ externalViewWaitTimeMs, tableNameWithType);
+ long startTime = System.currentTimeMillis();
+ while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() -
startTime < externalViewWaitTimeMs) {
+ ExternalView newExternalView = getTableExternalView(tableNameWithType);
+ Preconditions
+ .checkState(newExternalView != null, "Could not find external view
for table: %s", tableNameWithType);
+ Iterator<Map.Entry<String, Set<String>>> iterator =
segmentInstancesToCheck.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+ String segmentToCheck = entryToCheck.getKey();
+ Set<String> instancesToCheck = entryToCheck.getValue();
+ Map<String, String> newExternalViewStateMap =
newExternalView.getStateMap(segmentToCheck);
+ if (newExternalViewStateMap == null) {
+ continue;
+ }
+ boolean allOffline = true;
+ for (String instance : instancesToCheck) {
+ if
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+ allOffline = false;
+ break;
+ }
+ }
+ if (allOffline) {
+ iterator.remove();
+ }
+ }
+ Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+ }
+ if (!segmentInstancesToCheck.isEmpty()) {
+ throw new TimeoutException(String.format(
+ "Timed out waiting for external view to stabilize after call to
disable/reset segments. "
+ + "Disable/reset might complete in the background, but skipping
enable of segments of table: %s",
+ tableNameWithType));
+ }
+
+ // Lastly, enable segments
+ LOGGER.info("Enabling segments of table: {}", tableNameWithType);
+ for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
+ _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(),
tableNameWithType,
+ Lists.newArrayList(entry.getValue()));
+ }
+ for (Map.Entry<String, Set<String>> entry :
instanceToDisableSegmentsMap.entrySet()) {
+ _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(),
tableNameWithType,
+ Lists.newArrayList(entry.getValue()));
+ }
+ }
+
+ /**
* Sends a segment refresh message to:
* <ul>
* <li>Server: Refresh (replace) the segment by downloading a new one
based on the segment ZK metadata</li>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]