mcvsubbu commented on code in PR #8986:
URL: https://github.com/apache/pinot/pull/8986#discussion_r912297405
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -49,27 +50,71 @@
HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
@Path("/")
public class PinotRealtimeTableResource {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
+ @Inject
+ PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+ @POST
+ @Path("/tables/{tableName}/pauseConsumption")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Pause consumption of a realtime table",
+ notes = "Pause the consumption of a realtime table")
+ public Response pauseConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Resume the consumption of a realtime table",
- notes = "Resume the consumption of a realtime table")
- public String resumeConsumption(
- @ApiParam(value = "Name of the table", required = true)
- @PathParam("tableName") String tableName) throws JsonProcessingException
{
- // TODO: Add util method for invoking periodic tasks
+ @ApiOperation(value = "Resume consumption of a realtime table",
+ notes = "Resume the consumption for a realtime table")
+ public Response resumeConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
- Map<String, String> taskProperties = new HashMap<>();
-
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
"true");
+ validate(tableNameWithType);
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
- .invokeControllerPeriodicTask(tableNameWithType,
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+ @GET
+ @Path("/tables/{tableName}/pauseStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Return pause status of a realtime table",
+ notes = "Return pause status of a realtime table along with list of
consuming segments.")
+ public Response getConsumptionStatus(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return
Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
- + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() >
0) + "}";
+ private void validate(String tableNameWithType) {
+ IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ if (idealState == null) {
+ throw new ControllerApplicationException(LOGGER, "Ideal State is null
for table " + tableNameWithType,
Review Comment:
Maybe say "table not found"
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -916,11 +930,15 @@ void updateIdealStateOnSegmentCompletion(String
realtimeTableName, String commit
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
- newSegmentName, segmentAssignment, instancePartitionsMap);
+ isTablePaused(idealState) ? null : newSegmentName,
segmentAssignment, instancePartitionsMap);
Review Comment:
This method is called with `newSegmentName` set to null `null` if the table
is being paused right? Why check idealstate again?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3613,7 +3613,27 @@ public Pair<String, Integer>
invokeControllerPeriodicTask(String tableName, Stri
LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to
{} controllers.", periodicTaskRequestId,
messageCount);
- return Pair.of(periodicTaskRequestId, messageCount);
+ return new PeriodicTaskInvocationResponse(periodicTaskRequestId,
messageCount > 0);
+ }
+
+ public static class PeriodicTaskInvocationResponse {
Review Comment:
Same with this class. It is returned externally, move it so some api (or
perhaps spi) package. Not sure what the convention is that we are following
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Pause consumption on a table by
+ * 1) setting "isTablePaused" in ideal states to true and
+ * 2) sending force commit messages to servers
+ */
+ public PauseStatus pauseConsumption(String tableNameWithType) {
+ IdealState updatedIdealState =
updatePauseStatusInIdealState(tableNameWithType, true);
+ Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ return new PauseStatus(true, consumingSegments,
consumingSegments.isEmpty() ? null : "Pause flag is set."
Review Comment:
I would prefer returning a different class here than `PauseStatus`, since
`PauseStatus` is an externally visible class, and we need to keep
compatibility. For internal methods, we should be able to change things as
needed
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Pause consumption on a table by
+ * 1) setting "isTablePaused" in ideal states to true and
+ * 2) sending force commit messages to servers
+ */
+ public PauseStatus pauseConsumption(String tableNameWithType) {
+ IdealState updatedIdealState =
updatePauseStatusInIdealState(tableNameWithType, true);
+ Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ return new PauseStatus(true, consumingSegments,
consumingSegments.isEmpty() ? null : "Pause flag is set."
+ + " Consuming segments are being committed."
+ + " Use /pauseStatus endpoint in a few moments to check if all
consuming segments have been committed.");
+ }
+
+ /**
+ * Resume consumption on a table by
+ * 1) setting "isTablePaused" in ideal states to false and
+ * 2) triggering segment validation job to create new consuming segments
in ideal states
+ */
+ public PauseStatus resumeConsumption(String tableNameWithType) {
+ IdealState updatedIdealState =
updatePauseStatusInIdealState(tableNameWithType, false);
+
+ // trigger realtime segment validation job to resume consumption
+ Map<String, String> taskProperties = new HashMap<>();
+
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
"true");
Review Comment:
create an issue to see if this is still needed. No that we have
pause/resume, I see no need for a user to delete a consuming segment. We can
disable such deletion in the api.
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java:
##########
@@ -169,6 +173,34 @@ public HelixTaskResult handleMessage()
}
}
+ private class ForceCommitMessageHandler extends DefaultMessageHandler {
+
+ private String _tableName;
+ private Set<String> _segmentNames;
+
+ public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage,
ServerMetrics metrics,
+ NotificationContext ctx) {
+ super(forceCommitMessage, metrics, ctx);
+ _tableName = forceCommitMessage.getTableName();
+ _segmentNames = forceCommitMessage.getSegmentNames();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ throws InterruptedException {
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ _logger.info("Handling force commit message for table {} segments {}",
_tableName, _segmentNames);
+ try {
+ _instanceDataManager.forceCommit(_tableName, _segmentNames);
+ helixTaskResult.setSuccess(true);
+ } catch (Exception e) {
+ _metrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETE_TABLE_FAILURES, 1);
+ Utils.rethrowException(e);
Review Comment:
Not sure what helix does with exceptions thrown. Hopefully it does not
retry.
Another way is to just log the exception and move on.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -552,12 +564,12 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
.collect(Collectors.toSet());
int numPartitionGroups = newPartitionGroupMetadataList.size();
- // Only if committingSegment's partitionGroup is present in the
newPartitionGroupMetadataList, we create new
- // segment metadata
String newConsumingSegmentName = null;
- String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ if (!isTablePaused(idealState) &&
newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ // Only if committingSegment's partitionGroup is present in the
newPartitionGroupMetadataList, we create new
+ // segment metadata
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
Review Comment:
nice refactor moving these in
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java:
##########
@@ -169,6 +170,39 @@ public void deleteSegments(String tableName, TableType
tableType)
}
}
+ public PinotLLCRealtimeSegmentManager.PauseStatus pauseConsumption(String
tableName)
+ throws IOException {
+ try {
+ SimpleHttpResponse response =
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+
_controllerRequestURLBuilder.forPauseConsumption(tableName)).toURI(), null));
+ return JsonUtils.stringToObject(response.getResponse(),
PinotLLCRealtimeSegmentManager.PauseStatus.class);
Review Comment:
We don't want `ControllerRequestClient` to depend on
`PinotLLCRealtimeSegmentManager` right? Should we move `PauseStatus` to the
controller/api package? We may rename or move the PinotLLCRealtmeSegmentManager
at some point, and we don't want to rebuild clients just because. Perhaps there
are other such uses in this file, maybe we should pay attention and fix those
as well (on a separate PR/issue, of course) but we can stop introducing new ones
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]