kfaraz commented on code in PR #18466: URL: https://github.com/apache/druid/pull/18466#discussion_r2368319551
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -251,6 +254,8 @@ public enum Status private volatile DateTime minMessageTime; private volatile DateTime maxMessageTime; private final ScheduledExecutorService rejectionPeriodUpdaterExec; + private AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false); Review Comment: ```suggestion private final AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java: ########## @@ -98,7 +98,18 @@ public boolean isAvailableWithEarliest(OrderedSequenceNumber<SequenceOffsetType> public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<SequenceOffsetType> end, boolean isEndOffsetExclusive) { - final int compareToEnd = this.compareTo(end); - return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0; + // This happens in the situations where earlier sequences had a different partition mapping and has now been updated. + // Since the end is not defined, we can't really say if there is more to read or not. + try { + if (end.sequenceNumber == null) { + return false; + } + final int compareToEnd = this.compareTo(end); + return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0; + } + catch (Exception e) { Review Comment: What kind of exception can happen here? I don't think we should be catching it. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -206,14 +207,16 @@ public enum Status protected final Condition isAwaitingRetry = pollRetryLock.newCondition(); private final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task; - private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig; + private volatile Set<StreamPartition<PartitionIdType>> assignment; + private volatile RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier; + private SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig; private final SeekableStreamIndexTaskTuningConfig tuningConfig; private final InputRowSchema inputRowSchema; @Nullable private final InputFormat inputFormat; @Nullable private final InputRowParser<ByteBuffer> parser; - private final String stream; + private String stream; Review Comment: The input source stream never changes for a supervisor. See `SeekableStreamSupervisorSpec.validateSpecUpdateTo()`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -617,13 +624,15 @@ public void run() // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. Don't call "seekToStartingSequence" after "assignPartitions", because there's // no need to re-seek here. All we're going to be doing is dropping partitions. - assignment = assignPartitions(recordSupplier); - possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); + assignment = assignPartitions(this.recordSupplier); + possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, assignment); if (assignment.isEmpty()) { - log.debug("All partitions have been fully read."); - publishOnStop.set(true); - stopRequested.set(true); + if (!task.isPerpetuallyRunning()) { Review Comment: Combine the two nested ifs using an `&&`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -647,12 +656,12 @@ public void run() // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException // are handled in the subclasses. List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> records = getRecords( - recordSupplier, + this.recordSupplier, Review Comment: Nit: Please remove all the instances of `this.recordSupplier` to shorten the diff. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -599,15 +605,16 @@ public void run() // restart publishing of sequences (if any) maybePersistAndPublishSequences(committerSupplier); - Set<StreamPartition<PartitionIdType>> assignment = assignPartitions(recordSupplier); - possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); - seekToStartingSequence(recordSupplier, assignment); + assignment = assignPartitions(this.recordSupplier); + possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, assignment); + seekToStartingSequence(this.recordSupplier, assignment); Review Comment: Nit: these changes can be avoided since the `this.` is not really necessary. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -452,9 +457,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception //milliseconds waited for created segments to be handed off long handoffWaitMs = 0L; - + log.info("Task perpetually running: %s", task.isPerpetuallyRunning()); Review Comment: Move this log line to `runInternal`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -848,66 +857,15 @@ public void onFailure(Throwable t) // We need to copy sequences here, because the success callback in publishAndRegisterHandoff removes items from // the sequence list. If a publish finishes before we finish iterating through the sequence list, we can // end up skipping some sequences. - List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences); - for (int i = 0; i < sequencesSnapshot.size(); i++) { - final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i); - if (!publishingSequences.contains(sequenceMetadata.getSequenceName()) - && !publishedSequences.contains(sequenceMetadata.getSequenceName())) { - final boolean isLast = i == (sequencesSnapshot.size() - 1); - if (isLast) { - // Shorten endOffsets of the last sequence to match currOffsets. - sequenceMetadata.setEndOffsets(currOffsets); - } - - // Update assignments of the sequence, which should clear them. (This will be checked later, when the - // Committer is built.) - sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); - publishingSequences.add(sequenceMetadata.getSequenceName()); - // persist already done in finally, so directly add to publishQueue - publishAndRegisterHandoff(sequenceMetadata); - } - } + populateSequencesToPublish(); if (backgroundThreadException != null) { throw new RuntimeException(backgroundThreadException); } - // Wait for publish futures to complete. - Futures.allAsList(publishWaitList).get(); - - // Wait for handoff futures to complete. - // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding - // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it - // failed to persist sequences. It might also return null if handoff failed, but was recoverable. - // See publishAndRegisterHandoff() for details. - List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList(); - ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; - if (tuningConfig.getHandoffConditionTimeout() == 0) { - handedOffList = Futures.allAsList(handOffWaitList).get(); - } else { - final long start = System.nanoTime(); - try { - handedOffList = Futures.allAsList(handOffWaitList) - .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); - } - catch (TimeoutException e) { - // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception - // here. - log.makeAlert("Timeout waiting for handoff") - .addData("taskId", task.getId()) - .addData("handoffConditionTimeout", tuningConfig.getHandoffConditionTimeout()) - .emit(); - } - finally { - handoffWaitMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - } - } - - for (SegmentsAndCommitMetadata handedOff : handedOffList) { - log.info( - "Handoff complete for segments: %s", - String.join(", ", Lists.transform(handedOff.getSegments(), DataSegment::toString)) - ); Review Comment: Has the code only been moved or has the code changed too? Even if there are changes, we should keep the code here so that the changes are easy to see. We can do the refactor in a separate PR. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); + + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>) + toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class); + setIOConfig(newIoConfig); + createNewSequenceFromIoConfig(newIoConfig); + + assignment = assignPartitions(recordSupplier); + boolean shouldResume = true; + if (!assignment.isEmpty()) { + possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); + seekToStartingSequence(recordSupplier, assignment); + } else { + // if there is no assignment, It means that there was no partition assigned to this task after scaling down. + pause(); + shouldResume = false; + } + + log.info("Config updated to [%s]", this.ioConfig); + toolbox.getEmitter().emit(ServiceMetricEvent.builder() + .setDimension(DruidMetrics.TASK_ID, task.getId()) + .setDimension(DruidMetrics.TASK_TYPE, task.getType()) + .setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) + .setMetric("task/config/update/success", 1) + .build(ImmutableMap.of())); + if (shouldResume) { + resume(); + } + waitForConfigUpdate.set(false); + return Response.ok().build(); + } + catch (Exception e) { + log.makeAlert(e, "Failed to update task config"); + waitForConfigUpdate.set(false); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + private void setIOConfig( + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig + ) + { + this.ioConfig = ioConfig; + this.stream = ioConfig.getStartSequenceNumbers().getStream(); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap()); + this.minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN); + this.maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX); + } + + /** + * Creates new sequences for the ingestion process. It currently accepts the ioConfig given by the request as the correct offsets + * and ignores the offsets it may have stored in currOffsets and endOffsets. + */ + private void createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig) + throws IOException + { + Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets = ioConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap(); + Map<PartitionIdType, SequenceOffsetType> partitionEndSequences = ioConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap(); + + final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence( + partitionStartOffsets); + final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = new SequenceMetadata<>( + sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 1, + StringUtils.format( + "%s_%d", + ioConfig.getBaseSequenceName(), + sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 1 + ), + partitionStartOffsets, + partitionEndSequences, + false, + exclusiveStartPartitions, + getTaskLockType() + ); + log.info("Attempting adding new sequence [%s]", newSequence); + + currOffsets.clear(); + currOffsets.putAll(partitionStartOffsets); + endOffsets.clear(); + endOffsets.putAll(partitionEndSequences); + + addSequence(newSequence); + persistSequences(); + log.info( + "Created new sequence [%s] with start offsets [%s]", + newSequence.getSequenceName(), partitionStartOffsets + ); + } + + private void checkpointSequences() + { + try { + final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = getLastSequenceMetadata(); + if (!latestSequence.isCheckpointed()) { + final CheckPointDataSourceMetadataAction checkpointAciton = new CheckPointDataSourceMetadataAction( + getSupervisorId(), + ioConfig.getTaskGroupId(), + null, + createDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + stream, + latestSequence.getStartOffsets(), + latestSequence.getExclusiveStartPartitions() + ) + ) + ); + toolbox.getTaskActionClient().submit(checkpointAciton); Review Comment: We should throw an exception if this returns `false`, same as existing codeflow. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); + + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>) + toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class); Review Comment: Why convert? Isn't the payload already a `SeekableStreamTaskIOConfig`? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); + + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>) + toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class); + setIOConfig(newIoConfig); + createNewSequenceFromIoConfig(newIoConfig); + + assignment = assignPartitions(recordSupplier); + boolean shouldResume = true; Review Comment: Rather than this boolean, just call `resume()` inside the `if`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); + + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>) + toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class); + setIOConfig(newIoConfig); + createNewSequenceFromIoConfig(newIoConfig); + + assignment = assignPartitions(recordSupplier); + boolean shouldResume = true; + if (!assignment.isEmpty()) { + possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); + seekToStartingSequence(recordSupplier, assignment); + } else { + // if there is no assignment, It means that there was no partition assigned to this task after scaling down. + pause(); Review Comment: Wouldn't the task already be in a paused state? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -848,66 +857,15 @@ public void onFailure(Throwable t) // We need to copy sequences here, because the success callback in publishAndRegisterHandoff removes items from // the sequence list. If a publish finishes before we finish iterating through the sequence list, we can // end up skipping some sequences. - List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences); - for (int i = 0; i < sequencesSnapshot.size(); i++) { - final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i); - if (!publishingSequences.contains(sequenceMetadata.getSequenceName()) - && !publishedSequences.contains(sequenceMetadata.getSequenceName())) { - final boolean isLast = i == (sequencesSnapshot.size() - 1); - if (isLast) { - // Shorten endOffsets of the last sequence to match currOffsets. - sequenceMetadata.setEndOffsets(currOffsets); - } - - // Update assignments of the sequence, which should clear them. (This will be checked later, when the - // Committer is built.) - sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); - publishingSequences.add(sequenceMetadata.getSequenceName()); - // persist already done in finally, so directly add to publishQueue - publishAndRegisterHandoff(sequenceMetadata); - } - } Review Comment: Has this code only been moved or has the logic changed too? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -599,15 +605,16 @@ public void run() // restart publishing of sequences (if any) maybePersistAndPublishSequences(committerSupplier); - Set<StreamPartition<PartitionIdType>> assignment = assignPartitions(recordSupplier); - possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); - seekToStartingSequence(recordSupplier, assignment); + assignment = assignPartitions(this.recordSupplier); + possiblyResetDataSourceMetadata(toolbox, this.recordSupplier, assignment); + seekToStartingSequence(this.recordSupplier, assignment); ingestionState = IngestionState.BUILD_SEGMENTS; // Main loop. // Could eventually support leader/follower mode (for keeping replicas more in sync) - boolean stillReading = !assignment.isEmpty(); + log.info("Task perpetuallyRunning: %s", task.isPerpetuallyRunning()); Review Comment: Please remove these extra log lines. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -647,12 +656,12 @@ public void run() // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException // are handled in the subclasses. List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> records = getRecords( - recordSupplier, + this.recordSupplier, toolbox ); // note: getRecords() also updates assignment - stillReading = !assignment.isEmpty(); + stillReading = !assignment.isEmpty() || task.isPerpetuallyRunning(); Review Comment: Since this same logic is repeated in multiple places, put this in a method. ``` private boolean isStillReading() { return !assignment.isEmpty() || task.isPerpetuallyRunning(); } ``` ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java: ########## @@ -175,6 +181,74 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues() cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); } + @Test + @Timeout(60) + public void test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues() Review Comment: I think it would make more sense to put these test methods in a new test class which is more focused on scaling, task duration etc. This test class `KafkaClusterMetricsTest` was mostly about ingesting metrics of a cluster. The new test class can be called something like `KafkaTaskScalingTest` and it can use data directly published to Kafka topic (similar to `KafkaSupervisorTest`) rather than self cluster metrics. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") Review Comment: ```suggestion @Path("/config") ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1600,6 +1623,15 @@ public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() return endOffsets; } + @GET + @Path("/config") + @Produces(MediaType.APPLICATION_JSON) + public SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> getIOConfigHTTP(@Context final HttpServletRequest req) + { + authorizationCheck(req); + return ioConfig; Review Comment: Return a response object like `TaskConfigResponse` which contains an `ioConfig` field, so that this API mirrors the update config API, also allowing us to add any extra fields later. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1895,6 +2067,22 @@ public Response pauseHTTP( return pause(); } + @POST + @Path("/pauseAndCheckpoint") + @Produces(MediaType.APPLICATION_JSON) + public Response pauseAndCheckpointHTTP( + @Context final HttpServletRequest req + ) throws InterruptedException, JsonProcessingException + { + authorizationCheck(req); + if (!waitForConfigUpdate.compareAndSet(false, true)) { + return Response.ok().entity("Task is already paused for checkpoint completion").build(); + } + pause(); + checkpointSequences(); + return Response.ok().entity(toolbox.getJsonMapper().writeValueAsString(getCurrentOffsets())).build(); Review Comment: ```suggestion return Response.ok().entity(getCurrentOffsets()).build(); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1738,7 +1899,10 @@ public Response setEndOffsets( .build(); } else { try { - pauseLock.lockInterruptibly(); + // Don't acquire a lock if the task is already paused for checkpoint completion, avoiding deadlock + if (!waitForConfigUpdate.get()) { + pauseLock.lockInterruptibly(); + } Review Comment: Instead of putting all of the pause/resume operations inside an `if`, we should put this condition in a new method. ```java private void ensureNotWaitingForConfigUpdate() {...} ``` This method should throw an exception if the state is invalid rather than skipping silently. Call this method right before the relevant operation. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); + + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>) + toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class); + setIOConfig(newIoConfig); + createNewSequenceFromIoConfig(newIoConfig); + + assignment = assignPartitions(recordSupplier); + boolean shouldResume = true; + if (!assignment.isEmpty()) { + possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment); + seekToStartingSequence(recordSupplier, assignment); + } else { + // if there is no assignment, It means that there was no partition assigned to this task after scaling down. + pause(); + shouldResume = false; + } + + log.info("Config updated to [%s]", this.ioConfig); + toolbox.getEmitter().emit(ServiceMetricEvent.builder() + .setDimension(DruidMetrics.TASK_ID, task.getId()) + .setDimension(DruidMetrics.TASK_TYPE, task.getType()) + .setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) + .setMetric("task/config/update/success", 1) + .build(ImmutableMap.of())); + if (shouldResume) { + resume(); + } + waitForConfigUpdate.set(false); + return Response.ok().build(); + } + catch (Exception e) { + log.makeAlert(e, "Failed to update task config"); + waitForConfigUpdate.set(false); + return Response.serverError().entity(e.getMessage()).build(); + } + } + + private void setIOConfig( + SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig + ) + { + this.ioConfig = ioConfig; + this.stream = ioConfig.getStartSequenceNumbers().getStream(); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap()); + this.minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN); + this.maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX); + } + + /** + * Creates new sequences for the ingestion process. It currently accepts the ioConfig given by the request as the correct offsets + * and ignores the offsets it may have stored in currOffsets and endOffsets. + */ + private void createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig) + throws IOException + { + Map<PartitionIdType, SequenceOffsetType> partitionStartOffsets = ioConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap(); + Map<PartitionIdType, SequenceOffsetType> partitionEndSequences = ioConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap(); + + final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence( + partitionStartOffsets); + final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = new SequenceMetadata<>( + sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 1, + StringUtils.format( + "%s_%d", + ioConfig.getBaseSequenceName(), + sequences.isEmpty() ? 0 : getLastSequenceMetadata().getSequenceId() + 1 + ), + partitionStartOffsets, + partitionEndSequences, + false, + exclusiveStartPartitions, + getTaskLockType() + ); + log.info("Attempting adding new sequence [%s]", newSequence); + + currOffsets.clear(); + currOffsets.putAll(partitionStartOffsets); + endOffsets.clear(); + endOffsets.putAll(partitionEndSequences); + + addSequence(newSequence); + persistSequences(); + log.info( + "Created new sequence [%s] with start offsets [%s]", + newSequence.getSequenceName(), partitionStartOffsets + ); + } + + private void checkpointSequences() + { + try { + final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = getLastSequenceMetadata(); + if (!latestSequence.isCheckpointed()) { Review Comment: We should throw an exception if already checkpointed, same as the existing codeflow. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1275,7 +1298,7 @@ private void addSequence(final SequenceMetadata<PartitionIdType, SequenceOffsetT if (!sequences.isEmpty()) { final SequenceOffsetType priorOffset = getLastSequenceMetadata().endOffsets.get(partition); - if (!startOffset.equals(priorOffset)) { + if (priorOffset != null && !startOffset.equals(priorOffset)) { Review Comment: Shouldn't the null check be on `startOffset`? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); Review Comment: ```suggestion return Response.status(409).entity("Task must have been paused and checkpointed before updating config.").build(); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1895,6 +2067,22 @@ public Response pauseHTTP( return pause(); } + @POST + @Path("/pauseAndCheckpoint") + @Produces(MediaType.APPLICATION_JSON) + public Response pauseAndCheckpointHTTP( + @Context final HttpServletRequest req + ) throws InterruptedException, JsonProcessingException + { + authorizationCheck(req); + if (!waitForConfigUpdate.compareAndSet(false, true)) { + return Response.ok().entity("Task is already paused for checkpoint completion").build(); + } + pause(); + checkpointSequences(); Review Comment: If `pause` fails, we should reset the `waitForConfigUpdate` to `false`. If `checkPointsequences` fails, we should do the same. But in this case, I wonder if we should also un-pause the task? We wouldn't want to keep it in a paused state doing nothing forever. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/TaskConfigUpdateRequest.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Request object for updating the configuration of a running {@link SeekableStreamIndexTask}. + */ +public class TaskConfigUpdateRequest +{ + private final SeekableStreamIndexTaskIOConfig ioConfig; Review Comment: Should this field and the class have generic args for partition id type and offset type? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java: ########## @@ -1717,6 +1749,135 @@ public Response getUnparseableEvents( return Response.ok(parseExceptionHandler.getSavedParseExceptionReports()).build(); } + @POST + @Path("/updateConfig") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateConfig(TaskConfigUpdateRequest request, @Context final HttpServletRequest req) + throws InterruptedException + { + authorizationCheck(req); + if (!waitForConfigUpdate.get()) { + return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build(); + } + try { + log.info("Attempting to update config to [%s]", request.getIoConfig()); Review Comment: Please move the logic inside the `try` into a separate private method, and add a short javadoc to it outlining the steps involved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org