kfaraz commented on code in PR #18466:
URL: https://github.com/apache/druid/pull/18466#discussion_r2368319551


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -251,6 +254,8 @@ public enum Status
   private volatile DateTime minMessageTime;
   private volatile DateTime maxMessageTime;
   private final ScheduledExecutorService rejectionPeriodUpdaterExec;
+  private AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);

Review Comment:
   ```suggestion
     private final AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java:
##########
@@ -98,7 +98,18 @@ public boolean 
isAvailableWithEarliest(OrderedSequenceNumber<SequenceOffsetType>
   public boolean 
isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<SequenceOffsetType> end,
                                                  boolean isEndOffsetExclusive)
   {
-    final int compareToEnd = this.compareTo(end);
-    return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0;
+    // This happens in the situations where earlier sequences had a different 
partition mapping and has now been updated.
+    // Since the end is not defined, we can't really say if there is more to 
read or not.
+    try {
+      if (end.sequenceNumber == null) {
+        return false;
+      }
+      final int compareToEnd = this.compareTo(end);
+      return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0;
+    }
+    catch (Exception e) {

Review Comment:
   What kind of exception can happen here? I don't think we should be catching 
it.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -206,14 +207,16 @@ public enum Status
   protected final Condition isAwaitingRetry = pollRetryLock.newCondition();
 
   private final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, 
RecordType> task;
-  private final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig;
+  private volatile Set<StreamPartition<PartitionIdType>> assignment;
+  private volatile RecordSupplier<PartitionIdType, SequenceOffsetType, 
RecordType> recordSupplier;
+  private SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
ioConfig;
   private final SeekableStreamIndexTaskTuningConfig tuningConfig;
   private final InputRowSchema inputRowSchema;
   @Nullable
   private final InputFormat inputFormat;
   @Nullable
   private final InputRowParser<ByteBuffer> parser;
-  private final String stream;
+  private String stream;

Review Comment:
   The input source stream never changes for a supervisor. See 
`SeekableStreamSupervisorSpec.validateSpecUpdateTo()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -617,13 +624,15 @@ public void run()
             // The partition assignments may have changed while paused by a 
call to setEndOffsets() so reassign
             // partitions upon resuming. Don't call "seekToStartingSequence" 
after "assignPartitions", because there's
             // no need to re-seek here. All we're going to be doing is 
dropping partitions.
-            assignment = assignPartitions(recordSupplier);
-            possiblyResetDataSourceMetadata(toolbox, recordSupplier, 
assignment);
+            assignment = assignPartitions(this.recordSupplier);
+            possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, 
assignment);
 
             if (assignment.isEmpty()) {
-              log.debug("All partitions have been fully read.");
-              publishOnStop.set(true);
-              stopRequested.set(true);
+              if (!task.isPerpetuallyRunning()) {

Review Comment:
   Combine the two nested ifs using an `&&`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -647,12 +656,12 @@ public void run()
           // calling getRecord() ensures that exceptions specific to 
kafka/kinesis like OffsetOutOfRangeException
           // are handled in the subclasses.
           List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, 
RecordType>> records = getRecords(
-              recordSupplier,
+              this.recordSupplier,

Review Comment:
   Nit: Please remove all the instances of `this.recordSupplier` to shorten the 
diff.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -599,15 +605,16 @@ public void run()
       // restart publishing of sequences (if any)
       maybePersistAndPublishSequences(committerSupplier);
 
-      Set<StreamPartition<PartitionIdType>> assignment = 
assignPartitions(recordSupplier);
-      possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
-      seekToStartingSequence(recordSupplier, assignment);
+      assignment = assignPartitions(this.recordSupplier);
+      possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, 
assignment);
+      seekToStartingSequence(this.recordSupplier, assignment);

Review Comment:
   Nit: these changes can be avoided since the `this.` is not really necessary. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -452,9 +457,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws 
Exception
 
     //milliseconds waited for created segments to be handed off
     long handoffWaitMs = 0L;
-
+    log.info("Task perpetually running: %s", task.isPerpetuallyRunning());

Review Comment:
   Move this log line to `runInternal`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -848,66 +857,15 @@ public void onFailure(Throwable t)
       // We need to copy sequences here, because the success callback in 
publishAndRegisterHandoff removes items from
       // the sequence list. If a publish finishes before we finish iterating 
through the sequence list, we can
       // end up skipping some sequences.
-      List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = new ArrayList<>(sequences);
-      for (int i = 0; i < sequencesSnapshot.size(); i++) {
-        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata = sequencesSnapshot.get(i);
-        if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
-            && 
!publishedSequences.contains(sequenceMetadata.getSequenceName())) {
-          final boolean isLast = i == (sequencesSnapshot.size() - 1);
-          if (isLast) {
-            // Shorten endOffsets of the last sequence to match currOffsets.
-            sequenceMetadata.setEndOffsets(currOffsets);
-          }
-
-          // Update assignments of the sequence, which should clear them. 
(This will be checked later, when the
-          // Committer is built.)
-          sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadAfterReadingRecord);
-          publishingSequences.add(sequenceMetadata.getSequenceName());
-          // persist already done in finally, so directly add to publishQueue
-          publishAndRegisterHandoff(sequenceMetadata);
-        }
-      }
+      populateSequencesToPublish();
 
       if (backgroundThreadException != null) {
         throw new RuntimeException(backgroundThreadException);
       }
 
-      // Wait for publish futures to complete.
-      Futures.allAsList(publishWaitList).get();
-
-      // Wait for handoff futures to complete.
-      // Note that every publishing task (created by calling 
AppenderatorDriver.publish()) has a corresponding
-      // handoffFuture. handoffFuture can throw an exception if 1) the 
corresponding publishFuture failed or 2) it
-      // failed to persist sequences. It might also return null if handoff 
failed, but was recoverable.
-      // See publishAndRegisterHandoff() for details.
-      List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
-      ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
-      if (tuningConfig.getHandoffConditionTimeout() == 0) {
-        handedOffList = Futures.allAsList(handOffWaitList).get();
-      } else {
-        final long start = System.nanoTime();
-        try {
-          handedOffList = Futures.allAsList(handOffWaitList)
-                                 
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
-        }
-        catch (TimeoutException e) {
-          // Handoff timeout is not an indexing failure, but coordination 
failure. We simply ignore timeout exception
-          // here.
-          log.makeAlert("Timeout waiting for handoff")
-             .addData("taskId", task.getId())
-             .addData("handoffConditionTimeout", 
tuningConfig.getHandoffConditionTimeout())
-             .emit();
-        }
-        finally {
-          handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
-        }
-      }
-
-      for (SegmentsAndCommitMetadata handedOff : handedOffList) {
-        log.info(
-            "Handoff complete for segments: %s",
-            String.join(", ", Lists.transform(handedOff.getSegments(), 
DataSegment::toString))
-        );

Review Comment:
   Has the code only been moved or has the code changed too?
   Even if there are changes, we should keep the code here so that the changes 
are easy to see.
   We can do the refactor in a separate PR.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType>)
+          toolbox.getJsonMapper().convertValue(request.getIoConfig(), 
SeekableStreamIndexTaskIOConfig.class);
+      setIOConfig(newIoConfig);
+      createNewSequenceFromIoConfig(newIoConfig);
+
+      assignment = assignPartitions(recordSupplier);
+      boolean shouldResume = true;
+      if (!assignment.isEmpty()) {
+        possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
+        seekToStartingSequence(recordSupplier, assignment);
+      } else {
+        // if there is no assignment, It means that there was no partition 
assigned to this task after scaling down.
+        pause();
+        shouldResume = false;
+      }
+
+      log.info("Config updated to [%s]", this.ioConfig);
+      toolbox.getEmitter().emit(ServiceMetricEvent.builder()
+                                    .setDimension(DruidMetrics.TASK_ID, 
task.getId())
+                                    .setDimension(DruidMetrics.TASK_TYPE, 
task.getType())
+                                    .setDimension(DruidMetrics.DATASOURCE, 
task.getDataSource())
+                                    .setMetric("task/config/update/success", 1)
+                                    .build(ImmutableMap.of()));
+      if (shouldResume) {
+        resume();
+      }
+      waitForConfigUpdate.set(false);
+      return Response.ok().build();
+    }
+    catch (Exception e) {
+      log.makeAlert(e, "Failed to update task config");
+      waitForConfigUpdate.set(false);
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  private void setIOConfig(
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
ioConfig
+  )
+  {
+    this.ioConfig = ioConfig;
+    this.stream = ioConfig.getStartSequenceNumbers().getStream();
+    this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+    this.minMessageTime = 
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+    this.maxMessageTime = 
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+  }
+
+  /**
+   * Creates new sequences for the ingestion process. It currently accepts the 
ioConfig given by the request as the correct offsets
+   * and ignores the offsets it may have stored in currOffsets and endOffsets.
+   */
+  private void 
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig)
+      throws IOException
+  {
+    Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets = 
ioConfig.getStartSequenceNumbers()
+                                                                             
.getPartitionSequenceNumberMap();
+    Map<PartitionIdType, SequenceOffsetType> partitionEndSequences = 
ioConfig.getEndSequenceNumbers()
+                                                                             
.getPartitionSequenceNumberMap();
+
+    final Set<PartitionIdType> exclusiveStartPartitions = 
computeExclusiveStartPartitionsForSequence(
+        partitionStartOffsets);
+    final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = 
new SequenceMetadata<>(
+        sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 
1,
+        StringUtils.format(
+            "%s_%d",
+            ioConfig.getBaseSequenceName(),
+            sequences.isEmpty() ? 0 : 
getLastSequenceMetadata().getSequenceId() + 1
+        ),
+        partitionStartOffsets,
+        partitionEndSequences,
+        false,
+        exclusiveStartPartitions,
+        getTaskLockType()
+    );
+    log.info("Attempting adding new sequence [%s]", newSequence);
+
+    currOffsets.clear();
+    currOffsets.putAll(partitionStartOffsets);
+    endOffsets.clear();
+    endOffsets.putAll(partitionEndSequences);
+
+    addSequence(newSequence);
+    persistSequences();
+    log.info(
+        "Created new sequence [%s] with start offsets [%s]",
+        newSequence.getSequenceName(), partitionStartOffsets
+    );
+  }
+
+  private void checkpointSequences()
+  {
+    try {
+      final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = getLastSequenceMetadata();
+      if (!latestSequence.isCheckpointed()) {
+        final CheckPointDataSourceMetadataAction checkpointAciton = new 
CheckPointDataSourceMetadataAction(
+            getSupervisorId(),
+            ioConfig.getTaskGroupId(),
+            null,
+            createDataSourceMetadata(
+                new SeekableStreamStartSequenceNumbers<>(
+                    stream,
+                    latestSequence.getStartOffsets(),
+                    latestSequence.getExclusiveStartPartitions()
+                )
+            )
+        );
+        toolbox.getTaskActionClient().submit(checkpointAciton);

Review Comment:
   We should throw an exception if this returns `false`, same as existing 
codeflow.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType>)
+          toolbox.getJsonMapper().convertValue(request.getIoConfig(), 
SeekableStreamIndexTaskIOConfig.class);

Review Comment:
   Why convert? Isn't the payload already a `SeekableStreamTaskIOConfig`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType>)
+          toolbox.getJsonMapper().convertValue(request.getIoConfig(), 
SeekableStreamIndexTaskIOConfig.class);
+      setIOConfig(newIoConfig);
+      createNewSequenceFromIoConfig(newIoConfig);
+
+      assignment = assignPartitions(recordSupplier);
+      boolean shouldResume = true;

Review Comment:
   Rather than this boolean, just call `resume()` inside the `if`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType>)
+          toolbox.getJsonMapper().convertValue(request.getIoConfig(), 
SeekableStreamIndexTaskIOConfig.class);
+      setIOConfig(newIoConfig);
+      createNewSequenceFromIoConfig(newIoConfig);
+
+      assignment = assignPartitions(recordSupplier);
+      boolean shouldResume = true;
+      if (!assignment.isEmpty()) {
+        possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
+        seekToStartingSequence(recordSupplier, assignment);
+      } else {
+        // if there is no assignment, It means that there was no partition 
assigned to this task after scaling down.
+        pause();

Review Comment:
   Wouldn't the task already be in a paused state?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -848,66 +857,15 @@ public void onFailure(Throwable t)
       // We need to copy sequences here, because the success callback in 
publishAndRegisterHandoff removes items from
       // the sequence list. If a publish finishes before we finish iterating 
through the sequence list, we can
       // end up skipping some sequences.
-      List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = new ArrayList<>(sequences);
-      for (int i = 0; i < sequencesSnapshot.size(); i++) {
-        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata = sequencesSnapshot.get(i);
-        if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
-            && 
!publishedSequences.contains(sequenceMetadata.getSequenceName())) {
-          final boolean isLast = i == (sequencesSnapshot.size() - 1);
-          if (isLast) {
-            // Shorten endOffsets of the last sequence to match currOffsets.
-            sequenceMetadata.setEndOffsets(currOffsets);
-          }
-
-          // Update assignments of the sequence, which should clear them. 
(This will be checked later, when the
-          // Committer is built.)
-          sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadAfterReadingRecord);
-          publishingSequences.add(sequenceMetadata.getSequenceName());
-          // persist already done in finally, so directly add to publishQueue
-          publishAndRegisterHandoff(sequenceMetadata);
-        }
-      }

Review Comment:
   Has this code only been moved or has the logic changed too?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -599,15 +605,16 @@ public void run()
       // restart publishing of sequences (if any)
       maybePersistAndPublishSequences(committerSupplier);
 
-      Set<StreamPartition<PartitionIdType>> assignment = 
assignPartitions(recordSupplier);
-      possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
-      seekToStartingSequence(recordSupplier, assignment);
+      assignment = assignPartitions(this.recordSupplier);
+      possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, 
assignment);
+      seekToStartingSequence(this.recordSupplier, assignment);
 
       ingestionState = IngestionState.BUILD_SEGMENTS;
 
       // Main loop.
       // Could eventually support leader/follower mode (for keeping replicas 
more in sync)
-      boolean stillReading = !assignment.isEmpty();
+      log.info("Task perpetuallyRunning: %s", task.isPerpetuallyRunning());

Review Comment:
   Please remove these extra log lines.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -647,12 +656,12 @@ public void run()
           // calling getRecord() ensures that exceptions specific to 
kafka/kinesis like OffsetOutOfRangeException
           // are handled in the subclasses.
           List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, 
RecordType>> records = getRecords(
-              recordSupplier,
+              this.recordSupplier,
               toolbox
           );
 
           // note: getRecords() also updates assignment
-          stillReading = !assignment.isEmpty();
+          stillReading = !assignment.isEmpty() || task.isPerpetuallyRunning();

Review Comment:
   Since this same logic is repeated in multiple places, put this in a method.
   
   ```
   private boolean isStillReading()
   {
      return !assignment.isEmpty() || task.isPerpetuallyRunning();
   }
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java:
##########
@@ -175,6 +181,74 @@ public void 
test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
+  @Test
+  @Timeout(60)
+  public void 
test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()

Review Comment:
   I think it would make more sense to put these test methods in a new test 
class which is more focused on scaling, task duration etc. This test class 
`KafkaClusterMetricsTest` was mostly about ingesting metrics of a cluster.
   
   The new test class can be called something like `KafkaTaskScalingTest` and 
it can use data directly published to Kafka topic (similar to 
`KafkaSupervisorTest`) rather than self cluster metrics.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")

Review Comment:
   ```suggestion
     @Path("/config")
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1600,6 +1623,15 @@ public Map<PartitionIdType, SequenceOffsetType> 
getEndOffsets()
     return endOffsets;
   }
 
+  @GET
+  @Path("/config")
+  @Produces(MediaType.APPLICATION_JSON)
+  public SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
getIOConfigHTTP(@Context final HttpServletRequest req)
+  {
+    authorizationCheck(req);
+    return ioConfig;

Review Comment:
   Return a response object like `TaskConfigResponse` which contains an 
`ioConfig` field,
   so that this API mirrors the update config API, also allowing us to add any 
extra fields later.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1895,6 +2067,22 @@ public Response pauseHTTP(
     return pause();
   }
 
+  @POST
+  @Path("/pauseAndCheckpoint")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response pauseAndCheckpointHTTP(
+      @Context final HttpServletRequest req
+  ) throws InterruptedException, JsonProcessingException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.compareAndSet(false, true)) {
+      return Response.ok().entity("Task is already paused for checkpoint 
completion").build();
+    }
+    pause();
+    checkpointSequences();
+    return 
Response.ok().entity(toolbox.getJsonMapper().writeValueAsString(getCurrentOffsets())).build();

Review Comment:
   ```suggestion
       return Response.ok().entity(getCurrentOffsets()).build();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1738,7 +1899,10 @@ public Response setEndOffsets(
                      .build();
     } else {
       try {
-        pauseLock.lockInterruptibly();
+        // Don't acquire a lock if the task is already paused for checkpoint 
completion, avoiding deadlock
+        if (!waitForConfigUpdate.get()) {
+          pauseLock.lockInterruptibly();
+        }

Review Comment:
   Instead of putting all of the pause/resume operations inside an `if`, we 
should put this condition in a new method.
   
   ```java
   private void ensureNotWaitingForConfigUpdate() {...}
   ```
   
   This method should throw an exception if the state is invalid rather than 
skipping silently.
   Call this method right before the relevant operation.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());
+
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType>)
+          toolbox.getJsonMapper().convertValue(request.getIoConfig(), 
SeekableStreamIndexTaskIOConfig.class);
+      setIOConfig(newIoConfig);
+      createNewSequenceFromIoConfig(newIoConfig);
+
+      assignment = assignPartitions(recordSupplier);
+      boolean shouldResume = true;
+      if (!assignment.isEmpty()) {
+        possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment);
+        seekToStartingSequence(recordSupplier, assignment);
+      } else {
+        // if there is no assignment, It means that there was no partition 
assigned to this task after scaling down.
+        pause();
+        shouldResume = false;
+      }
+
+      log.info("Config updated to [%s]", this.ioConfig);
+      toolbox.getEmitter().emit(ServiceMetricEvent.builder()
+                                    .setDimension(DruidMetrics.TASK_ID, 
task.getId())
+                                    .setDimension(DruidMetrics.TASK_TYPE, 
task.getType())
+                                    .setDimension(DruidMetrics.DATASOURCE, 
task.getDataSource())
+                                    .setMetric("task/config/update/success", 1)
+                                    .build(ImmutableMap.of()));
+      if (shouldResume) {
+        resume();
+      }
+      waitForConfigUpdate.set(false);
+      return Response.ok().build();
+    }
+    catch (Exception e) {
+      log.makeAlert(e, "Failed to update task config");
+      waitForConfigUpdate.set(false);
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  private void setIOConfig(
+      SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> 
ioConfig
+  )
+  {
+    this.ioConfig = ioConfig;
+    this.stream = ioConfig.getStartSequenceNumbers().getStream();
+    this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+    this.minMessageTime = 
Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
+    this.maxMessageTime = 
Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
+  }
+
+  /**
+   * Creates new sequences for the ingestion process. It currently accepts the 
ioConfig given by the request as the correct offsets
+   * and ignores the offsets it may have stored in currOffsets and endOffsets.
+   */
+  private void 
createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig)
+      throws IOException
+  {
+    Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets = 
ioConfig.getStartSequenceNumbers()
+                                                                             
.getPartitionSequenceNumberMap();
+    Map<PartitionIdType, SequenceOffsetType> partitionEndSequences = 
ioConfig.getEndSequenceNumbers()
+                                                                             
.getPartitionSequenceNumberMap();
+
+    final Set<PartitionIdType> exclusiveStartPartitions = 
computeExclusiveStartPartitionsForSequence(
+        partitionStartOffsets);
+    final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = 
new SequenceMetadata<>(
+        sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 
1,
+        StringUtils.format(
+            "%s_%d",
+            ioConfig.getBaseSequenceName(),
+            sequences.isEmpty() ? 0 : 
getLastSequenceMetadata().getSequenceId() + 1
+        ),
+        partitionStartOffsets,
+        partitionEndSequences,
+        false,
+        exclusiveStartPartitions,
+        getTaskLockType()
+    );
+    log.info("Attempting adding new sequence [%s]", newSequence);
+
+    currOffsets.clear();
+    currOffsets.putAll(partitionStartOffsets);
+    endOffsets.clear();
+    endOffsets.putAll(partitionEndSequences);
+
+    addSequence(newSequence);
+    persistSequences();
+    log.info(
+        "Created new sequence [%s] with start offsets [%s]",
+        newSequence.getSequenceName(), partitionStartOffsets
+    );
+  }
+
+  private void checkpointSequences()
+  {
+    try {
+      final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = getLastSequenceMetadata();
+      if (!latestSequence.isCheckpointed()) {

Review Comment:
   We should throw an exception if already checkpointed, same as the existing 
codeflow.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1275,7 +1298,7 @@ private void addSequence(final 
SequenceMetadata<PartitionIdType, SequenceOffsetT
       if (!sequences.isEmpty()) {
         final SequenceOffsetType priorOffset = 
getLastSequenceMetadata().endOffsets.get(partition);
 
-        if (!startOffset.equals(priorOffset)) {
+        if (priorOffset != null && !startOffset.equals(priorOffset)) {

Review Comment:
   Shouldn't the null check be on `startOffset`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();

Review Comment:
   ```suggestion
         return Response.status(409).entity("Task must have been paused and 
checkpointed before updating config.").build();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1895,6 +2067,22 @@ public Response pauseHTTP(
     return pause();
   }
 
+  @POST
+  @Path("/pauseAndCheckpoint")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response pauseAndCheckpointHTTP(
+      @Context final HttpServletRequest req
+  ) throws InterruptedException, JsonProcessingException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.compareAndSet(false, true)) {
+      return Response.ok().entity("Task is already paused for checkpoint 
completion").build();
+    }
+    pause();
+    checkpointSequences();

Review Comment:
   If `pause` fails, we should reset the `waitForConfigUpdate` to `false`.
   If `checkPointsequences` fails, we should do the same. But in this case, I 
wonder if we should also un-pause the task? We wouldn't want to keep it in a 
paused state doing nothing forever.
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskConfigUpdateRequest.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Request object for updating the configuration of a running {@link 
SeekableStreamIndexTask}.
+ */
+public class TaskConfigUpdateRequest
+{
+  private final SeekableStreamIndexTaskIOConfig ioConfig;

Review Comment:
   Should this field and the class have generic args for partition id type and 
offset type?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1717,6 +1749,135 @@ public Response getUnparseableEvents(
     return 
Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build();
   }
 
+  @POST
+  @Path("/updateConfig")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateConfig(TaskConfigUpdateRequest request, @Context final 
HttpServletRequest req)
+      throws InterruptedException
+  {
+    authorizationCheck(req);
+    if (!waitForConfigUpdate.get()) {
+      return Response.status(409).entity("Task must be paused for checkpoint 
completion before updating config").build();
+    }
+    try {
+      log.info("Attempting to update config to [%s]", request.getIoConfig());

Review Comment:
   Please move the logic inside the `try` into a separate private method, and 
add a short javadoc to it outlining the steps involved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org


Reply via email to