kfaraz commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2320778816


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {
+      requestPause();
+      checkpointSequences();
+
+      this.ioConfig = req.getIoConfig();
+      this.stream = ioConfig.getStartSequenceNumbers().getStream();
+      this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+      minMessageTime = 
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+      maxMessageTime = 
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+
+      createNewSequenceFromIoConfig(req.getIoConfig());
+      resume();
+      return Response.ok().build();
+    } catch (Exception e) {
+      log.makeAlert(e, "Failed to update task config");
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  /**
+   * Creates new sequences for the ingestion process. It currently accepts the 
ioConfig given by the request as the correct offsets
+   * and ignores the offsets it may have stored in currOffsets and endOffsets.
+   */
+  private void 
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig)

Review Comment:
   For the 2 new methods, the `SeekableStreamIndexTaskRunner` must already be 
performing these actions.
   Let's try to put them in common methods so that we can use the same method 
in the normal flow as well as on update config.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {
+      requestPause();

Review Comment:
   We should call `pause()` instead of `requestPause()`, since the latter only 
requests a pause but doesn't ensure that we reach a paused state.
   If the `pause()` call returns non-OK response, we should return the same 
response immediately.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {
+      requestPause();
+      checkpointSequences();
+
+      this.ioConfig = req.getIoConfig();
+      this.stream = ioConfig.getStartSequenceNumbers().getStream();
+      this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+      minMessageTime = 
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+      maxMessageTime = 
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);

Review Comment:
   Maybe put this part in a new method so that constructor can also reuse the 
code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {

Review Comment:
   Needs to have an `authorizationCheck()` first.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1717,98 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, 
SequenceOffsetType> req) throws InterruptedException
+  {
+    try {
+      requestPause();

Review Comment:
   Before starting with pause, we can log the new config that we are trying to 
update to.
   After the update finishes, let's log the new config and also emit an event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to