bsyk commented on code in PR #18466: URL: https://github.com/apache/druid/pull/18466#discussion_r2323552462
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1717,98 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, SequenceOffsetType> req) throws InterruptedException + { + try { + requestPause(); + checkpointSequences(); + + this.ioConfig = req.getIoConfig(); + this.stream = ioConfig.getStartSequenceNumbers().getStream(); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap()); + minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN); + maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX); + + createNewSequenceFromIoConfig(req.getIoConfig()); + resume(); + return Response.ok().build(); + } catch (Exception e) { + log.makeAlert(e, "Failed to update task config"); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + /** + * Creates new sequences for the ingestion process. It currently accepts the ioConfig given by the request as the correct offsets + * and ignores the offsets it may have stored in currOffsets and endOffsets. + */ + private void createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig) Review Comment: Consider looking at the Kafka [StickyAssignor](https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html) implementation for inspiration here. The Kafka StickyAssignor is a partition assignment strategy for Kafka consumers within a consumer group. Its primary goal is to achieve both a balanced distribution of partitions among consumers and to minimize the movement of partitions during rebalances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org