uds5501 commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2371594031
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
return
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
}
+ @POST
+ @Path("/updateConfig")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response updateConfig(TaskConfigUpdateRequest request, @Context final
HttpServletRequest req)
+ throws InterruptedException
+ {
+ authorizationCheck(req);
+ if (!waitForConfigUpdate.get()) {
+ return Response.status(409).entity("Task must be paused for checkpoint
completion before updating config").build();
+ }
+ try {
+ log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+ SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType>)
+ toolbox.getJsonMapper().convertValue(request.getIoConfig(),
SeekableStreamIndexTaskIOConfig.class);
+ setIOConfig(newIoConfig);
+ createNewSequenceFromIoConfig(newIoConfig);
+
+ assignment = assignPartitions(recordSupplier);
+ boolean shouldResume = true;
+ if (!assignment.isEmpty()) {
+ possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
+ seekToStartingSequence(recordSupplier, assignment);
+ } else {
+ // if there is no assignment, It means that there was no partition
assigned to this task after scaling down.
+ pause();
+ shouldResume = false;
+ }
+
+ log.info("Config updated to [%s]", this.ioConfig);
+ toolbox.getEmitter().emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.TASK_ID,
task.getId())
+ .setDimension(DruidMetrics.TASK_TYPE,
task.getType())
+ .setDimension(DruidMetrics.DATASOURCE,
task.getDataSource())
+ .setMetric("task/config/update/success", 1)
+ .build(ImmutableMap.of()));
+ if (shouldResume) {
+ resume();
+ }
+ waitForConfigUpdate.set(false);
+ return Response.ok().build();
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to update task config");
+ waitForConfigUpdate.set(false);
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+
+ private void setIOConfig(
+ SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>
ioConfig
+ )
+ {
+ this.ioConfig = ioConfig;
+ this.stream = ioConfig.getStartSequenceNumbers().getStream();
+ this.endOffsets = new
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+ this.minMessageTime =
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+ this.maxMessageTime =
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+ }
+
+ /**
+ * Creates new sequences for the ingestion process. It currently accepts the
ioConfig given by the request as the correct offsets
+ * and ignores the offsets it may have stored in currOffsets and endOffsets.
+ */
+ private void
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> ioConfig)
+ throws IOException
+ {
+ Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets =
ioConfig.getStartSequenceNumbers()
+
.getPartitionSequenceNumberMap();
+ Map<PartitionIdType, SequenceOffsetType> partitionEndSequences =
ioConfig.getEndSequenceNumbers()
+
.getPartitionSequenceNumberMap();
+
+ final Set<PartitionIdType> exclusiveStartPartitions =
computeExclusiveStartPartitionsForSequence(
+ partitionStartOffsets);
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence =
new SequenceMetadata<>(
+ sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() +
1,
+ StringUtils.format(
+ "%s_%d",
+ ioConfig.getBaseSequenceName(),
+ sequences.isEmpty() ? 0 :
getLastSequenceMetadata().getSequenceId() + 1
+ ),
+ partitionStartOffsets,
+ partitionEndSequences,
+ false,
+ exclusiveStartPartitions,
+ getTaskLockType()
+ );
+ log.info("Attempting adding new sequence [%s]", newSequence);
+
+ currOffsets.clear();
+ currOffsets.putAll(partitionStartOffsets);
+ endOffsets.clear();
+ endOffsets.putAll(partitionEndSequences);
+
+ addSequence(newSequence);
+ persistSequences();
+ log.info(
+ "Created new sequence [%s] with start offsets [%s]",
+ newSequence.getSequenceName(), partitionStartOffsets
+ );
+ }
+
+ private void checkpointSequences()
+ {
+ try {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = getLastSequenceMetadata();
+ if (!latestSequence.isCheckpointed()) {
Review Comment:
Hmm I see, In this scenario, the thread is trying to complete the checkpoint
of a sequence that's already been checkpointed.
Note: Checkpointing is a 2 phase scenario
1. Runner sends a checkpoint action for the latest data it read to
supervisor and pauses itself
2. Supervisor returns the end offsets to the runner, runner attempts to set
the end offsets
> Is it an error scenario?
Maybe, but not in the current context, It might be for supervisor's
checkpoint handling (the step 2). I need to double check in the supervisor's
checkpoint handling whether it may keep the dynamic handling forever stuck
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]