bsyk commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2323552462


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {
+      requestPause();
+      checkpointSequences();
+
+      this.ioConfig = req.getIoConfig();
+      this.stream = ioConfig.getStartSequenceNumbers().getStream();
+      this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+      minMessageTime = 
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+      maxMessageTime = 
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+
+      createNewSequenceFromIoConfig(req.getIoConfig());
+      resume();
+      return Response.ok().build();
+    } catch (Exception e) {
+      log.makeAlert(e, "Failed to update task config");
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  /**
+   * Creates new sequences for the ingestion process. It currently accepts the 
ioConfig given by the request as the correct offsets
+   * and ignores the offsets it may have stored in currOffsets and endOffsets.
+   */
+  private void 
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig)

Review Comment:
   Consider looking at the Kafka 
[StickyAssignor](https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html)
 implementation for inspiration here.
   The Kafka StickyAssignor is a partition assignment strategy for Kafka 
consumers within a consumer group. Its primary goal is to achieve both a 
balanced distribution of partitions among consumers and to minimize the 
movement of partitions during rebalances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to