kfaraz commented on code in PR #18082:
URL: https://github.com/apache/druid/pull/18082#discussion_r2151208541


##########
docs/ingestion/supervisor.md:
##########
@@ -411,6 +413,10 @@ This value is for the ideal situation in which there is at 
most one set of tasks
 In some circumstances, it is possible to have multiple sets of tasks 
publishing simultaneously. This would happen if the
 time-to-publish (generate segment, push to deep storage, load on Historical) 
is greater than `taskDuration`. This is a valid and correct scenario but 
requires additional worker capacity to support. In general, it is a good idea 
to have `taskDuration` be large enough that the previous set of tasks finishes 
publishing before the current set begins.
 
+## Multi-Supervisor Support

Review Comment:
   Let's mark this as experimental for now.



##########
docs/querying/sql-metadata-tables.md:
##########
@@ -299,6 +299,7 @@ The supervisors table provides information about 
supervisors.
 |Column|Type|Notes|
 |------|-----|-----|
 |supervisor_id|VARCHAR|Supervisor task identifier|
+|datasource|VARCHAR|Datasource the supervisor operates on|

Review Comment:
   Nice!



##########
docs/ingestion/supervisor.md:
##########
@@ -411,6 +413,10 @@ This value is for the ideal situation in which there is at 
most one set of tasks
 In some circumstances, it is possible to have multiple sets of tasks 
publishing simultaneously. This would happen if the
 time-to-publish (generate segment, push to deep storage, load on Historical) 
is greater than `taskDuration`. This is a valid and correct scenario but 
requires additional worker capacity to support. In general, it is a good idea 
to have `taskDuration` be large enough that the previous set of tasks finishes 
publishing before the current set begins.
 
+## Multi-Supervisor Support
+Druid supports multiple stream supervisors ingesting into the same datasource. 
This means you can have any number of the configured stream supervisors (Kafka, 
Kinesis, etc.) ingesting into the same datasource at the same time.

Review Comment:
   ```suggestion
   Druid supports multiple stream supervisors ingesting into the same 
datasource. This means you can have any number of stream supervisors (Kafka, 
Kinesis, etc.) ingesting into the same datasource at the same time.
   ```



##########
docs/ingestion/supervisor.md:
##########
@@ -411,6 +413,10 @@ This value is for the ideal situation in which there is at 
most one set of tasks
 In some circumstances, it is possible to have multiple sets of tasks 
publishing simultaneously. This would happen if the
 time-to-publish (generate segment, push to deep storage, load on Historical) 
is greater than `taskDuration`. This is a valid and correct scenario but 
requires additional worker capacity to support. In general, it is a good idea 
to have `taskDuration` be large enough that the previous set of tasks finishes 
publishing before the current set begins.
 
+## Multi-Supervisor Support
+Druid supports multiple stream supervisors ingesting into the same datasource. 
This means you can have any number of the configured stream supervisors (Kafka, 
Kinesis, etc.) ingesting into the same datasource at the same time.
+In order to ensure proper synchronization between ingestion tasks with 
multiple supervisors, it's important to set `useConcurrentLocks=true` in the 
`context` field of the supervisor spec.

Review Comment:
   Please link to the page in the docs that describes concurrent locks.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -226,7 +226,8 @@ public Response specGetAll(
                   if (includeFull) {
                     Optional<SupervisorSpec> theSpec = 
manager.getSupervisorSpec(x);
                     if (theSpec.isPresent()) {
-                      theBuilder.withSpec(manager.getSupervisorSpec(x).get());
+                      theBuilder.withSpec(theSpec.get())
+                          
.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null));

Review Comment:
   formatting



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -312,6 +312,20 @@ private Set<PartitionIdType> 
computeExclusiveStartPartitionsForSequence(
     }
   }
 
+  /**
+   * Returns the supervisorId for the task this runner is executing.
+   * Backwards compatibility: if task spec from metadata has a null 
supervisorId field, falls back to dataSource
+  */
+  public String getSupervisorId()
+  {
+    @Nullable
+    final String supervisorId = task.getSupervisorId();
+    if (supervisorId != null) {
+      return supervisorId;
+    }
+    return task.getDataSource();

Review Comment:
   the task always returns a non-null supervisor ID anyway.
   ```suggestion
       return task.getSupervisorId();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java:
##########
@@ -126,6 +130,12 @@ public DataSchema getDataSchema()
     return dataSchema;
   }
 
+  @JsonProperty
+  public String getSupervisorId()

Review Comment:
   Please add a short javadoc explaining how this is computed and the default 
value.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java:
##########
@@ -122,7 +131,15 @@ private SegmentTransactionalInsertAction(
     this.startMetadata = startMetadata;
     this.endMetadata = endMetadata;
     this.dataSource = dataSource;
+    this.supervisorId = Configs.valueOrDefault(supervisorId, dataSource);
     this.segmentSchemaMapping = segmentSchemaMapping;
+
+    if ((startMetadata == null && endMetadata != null)
+        || (startMetadata != null && endMetadata == null)) {
+      throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");

Review Comment:
   Follow up: Common out this validation somewhere.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -95,13 +99,28 @@ private SegmentTransactionalAppendAction(
     this.startMetadata = startMetadata;
     this.endMetadata = endMetadata;
 
+    if (supervisorId == null && !segments.isEmpty()) {

Review Comment:
   In the follow up, we can also add a validation that `segments` is always 
non-empty.
   This task action doesn't make sense with an empty set of segments.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -405,47 +409,47 @@ SegmentPublishResult commitReplaceSegments(
   );
 
   /**
-   * Retrieves data source's metadata from the metadata store. Returns null if 
there is no metadata.
+   * Retrieves {@link DataSourceMetadata} entry for {@code supervisorId} from 
the metadata store. Returns null if there is no metadata.
    */
-  @Nullable DataSourceMetadata retrieveDataSourceMetadata(String dataSource);
+  @Nullable DataSourceMetadata retrieveDataSourceMetadata(String supervisorId);

Review Comment:
   We should either consider renaming the `DataSourceMetadata` class (and these 
methods which perform CRUD on it) or at least adding update javadoc for 
`DataSourceMetadata` to indicate that it is stored against a supervisor.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -315,6 +315,8 @@ SegmentIdWithShardSpec allocatePendingSegment(
    * If segmentsToDrop is not null and not empty, this insertion will be 
atomic with a insert-and-drop on inserting
    * {@param segments} and dropping {@param segmentsToDrop}.
    *
+   * @param supervisorId   supervisorID which is committing the segments. 
Cannot be null if `startMetadata`
+   *                       and endMetadata` are both non-null.

Review Comment:
   Back-quotes are not formatted in javadocs, Use `{@code startMetadata}` 
instead.
   ```suggestion
      * @param supervisorId   supervisorID which is committing the segments. 
Cannot be null if `startMetadata`
      *                       and endMetadata` are both non-null.
   ```



##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java:
##########
@@ -947,6 +949,186 @@ private void doMethodTeardown(GeneratedTestConfig 
generatedTestConfig)
     }
   }
 
+  /**
+   * Test ingestion with multiple supervisors writing to the same datasource.
+   * This test creates multiple supervisors (specified by supervisorCount) 
that all write to the same datasource.
+   * Each supervisor reads from its own stream and processes a distinct subset 
of events.
+   * The total number of events across all streams equals the standard test 
event count.
+   *
+   * @param transactionEnabled Whether to enable transactions (null for 
streams that don't support transactions)
+   * @param numSupervisors     Number of supervisors to create
+   * @throws Exception if an error occurs
+   */
+  protected void doTestMultiSupervisorIndexDataStableState(
+      @Nullable Boolean transactionEnabled,
+      int numSupervisors
+  ) throws Exception
+  {
+
+    final String dataSource = getTestNamePrefix() + "_test_" + 
UUID.randomUUID();
+    final String fullDatasourceName = dataSource + 
config.getExtraDatasourceNameSuffix();
+
+    final List<GeneratedTestConfig> testConfigs = new 
ArrayList<>(numSupervisors);
+    final List<StreamEventWriter> streamEventWriters = new 
ArrayList<>(numSupervisors);
+    final List<Closeable> resourceClosers = new ArrayList<>(numSupervisors);
+
+    try {
+      for (int i = 0; i < numSupervisors; ++i) {
+        final String supervisorId = fullDatasourceName + "_supervisor_" + i;
+        GeneratedTestConfig testConfig = new GeneratedTestConfig(
+            INPUT_FORMAT,
+            getResourceAsString(JSON_INPUT_FORMAT_PATH),
+            fullDatasourceName
+        );
+        testConfig.setSupervisorId(supervisorId);
+
+        testConfigs.add(testConfig);
+        Closeable closer = createResourceCloser(testConfig);
+        resourceClosers.add(closer);
+
+        StreamEventWriter writer = createStreamEventWriter(config, 
transactionEnabled);
+        streamEventWriters.add(writer);
+
+        final String taskSpec = testConfig.getStreamIngestionPropsTransform()
+                                          
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+        LOG.info("supervisorSpec for stream [%s]: [%s]", 
testConfig.getStreamName(), taskSpec);
+
+        indexer.submitSupervisor(taskSpec);
+        LOG.info("Submitted supervisor [%s] for stream [%s]", supervisorId, 
testConfig.getStreamName());
+      }
+
+      for (GeneratedTestConfig testConfig : testConfigs) {
+        ITRetryUtil.retryUntil(
+            () -> SupervisorStateManager.BasicState.RUNNING.equals(
+                indexer.getSupervisorStatus(testConfig.getSupervisorId())
+            ),
+            true,
+            10_000,
+            30,
+            "Waiting for supervisor [" + testConfig.getSupervisorId() + "] to 
be running"
+        );

Review Comment:
   Use `retryUntilEquals` here and in other similar places for cleaner syntax.
   
   ```suggestion
           ITRetryUtil.retryUntilEquals(
               () -> .indexer.getSupervisorStatus(testConfig.getSupervisorId()),
               SupervisorStateManager.BasicState.RUNNING,
               10_000,
               30,
               "State of supervisor[" + testConfig.getSupervisorId() + "]"
           );
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1220,7 +1238,13 @@ private SegmentPublishResult 
commitAppendSegmentsAndMetadataInTransaction(
             // Try to update datasource metadata first
             if (startMetadata != null) {
               final SegmentPublishResult metadataUpdateResult
-                  = updateDataSourceMetadataInTransaction(transaction, 
dataSource, startMetadata, endMetadata);
+                  = updateDataSourceMetadataInTransaction(
+                  transaction,
+                  supervisorId,
+                  dataSource,
+                  startMetadata,
+                  endMetadata
+              );

Review Comment:
   formatting



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -311,6 +311,12 @@ tableName, getPayloadType(), getQuoteString(), 
getCollation()
     alterPendingSegmentsTable(tableName);
   }
 
+  /**
+   * Creates the table for storing datasource metadata for supervisors.
+   * Due to backwards compatibility reasons, the `dataSource` column will 
always uniquely identify a supervisor.
+   * For certain types of supervisors which support N:1 supervisor:datasource 
relationship, the `dataSource` column will store the supervisor ID.
+   * Otherwise, it will store the legacy supervisor ID – the `dataSource` 
itself.

Review Comment:
   I think it is simpler to say that:
   
   ```
   The {@code dataSource} column stores the supervisor ID.
   But it has not been renamed to retain backwards compatibility.
   Supervisors created without an explicit supervisor id default to using the 
datasource name.
   ```



##########
docs/operations/metrics.md:
##########
@@ -225,11 +225,11 @@ These metrics apply to the [Kafka indexing 
service](../ingestion/kafka-ingestion
 
 |Metric|Description|Dimensions|Normal value|
 |------|-----------|----------|------------|
-|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed 
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum 
emission period for this metric is a minute.|`dataSource`, `stream`, 
`partition`, `tags`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/fetchOffsets/time`|Total time (in milliseconds) taken to fetch 
and update the latest offsets from Kafka stream and the ingestion 
tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few 
seconds at most.|
+|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`supervisorId`, 
`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high 
number. |

Review Comment:
   I wonder if we should emit the `supervisorId` dimension only if it is 
different from `dataSource`,
   at least while this feature is still experimental.
   Otherwise, we are emitting a lot of redundant data which is not needed by 
majority users.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1245,22 +1257,22 @@ public void tryInit()
                     if (log.isDebugEnabled()) {
                       log.debug(
                           "Handled notice[%s] from notices queue in [%d] ms, "
-                              + "current notices queue size [%d] for 
datasource[%s].",
-                          noticeType, noticeHandleTime.millisElapsed(), 
getNoticesQueueSize(), dataSource
+                              + "current notices queue size [%d] for 
supervisor[%s] for datasource[%s].",
+                          noticeType, noticeHandleTime.millisElapsed(), 
getNoticesQueueSize(), supervisorId, dataSource
                       );
                     }
                   }
                   catch (Throwable e) {
                     stateManager.recordThrowableEvent(e);
-                    log.makeAlert(e, "SeekableStreamSupervisor[%s] failed to 
handle notice", dataSource)
+                    log.makeAlert(e, "SeekableStreamSupervisor[%s] for 
datasource=[%s] failed to handle notice", supervisorId, dataSource)

Review Comment:
   ```suggestion
                       log.makeAlert(e, "Supervisor[%s] for datasource[%s] 
failed to handle notice", supervisorId, dataSource)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4520,6 +4532,7 @@ protected void emitNoticeProcessTime(String noticeType, 
long timeInMillis)
       emitter.emit(
           ServiceMetricEvent.builder()
                             .setDimension("noticeType", noticeType)
+                            .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)

Review Comment:
   Across all the metrics, we should set this dimension only if it is not equal 
to `dataSource`.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2263,35 +2289,35 @@ protected SegmentPublishResult 
updateDataSourceMetadataInTransaction(
 
     if (publishResult.isSuccess()) {
       log.info(
-          "Updated metadata for datasource[%s] from[%s] to[%s].",
-          dataSource, oldCommitMetadataFromDb, newCommitMetadata
+          "Updated metadata for supervisor[%s] for datasource[%s] from[%s] 
to[%s].",
+          supervisorId, dataSource, oldCommitMetadataFromDb, newCommitMetadata
       );
     } else {
       log.info(
-          "Failed to update metadata for datasource[%s] due to reason[%s].",
-          dataSource, publishResult.getErrorMsg()
+          "Failed to update metadata for supervisor[%s] for datasource[%s] due 
to reason[%s].",

Review Comment:
   ```suggestion
             "Failed to update metadata for supervisor[%s], datasource[%s] due 
to reason[%s].",
   ```
   
   Please make a similar fix in language of log lines in 
`SeekableStreamSupervisor` class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -151,9 +163,10 @@ public ServiceEmitter getEmitter()
   }
 
   @Override
+  @JsonProperty
   public String getId()

Review Comment:
   Please add a short javadoc for this.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2263,35 +2289,35 @@ protected SegmentPublishResult 
updateDataSourceMetadataInTransaction(
 
     if (publishResult.isSuccess()) {
       log.info(
-          "Updated metadata for datasource[%s] from[%s] to[%s].",
-          dataSource, oldCommitMetadataFromDb, newCommitMetadata
+          "Updated metadata for supervisor[%s] for datasource[%s] from[%s] 
to[%s].",

Review Comment:
   ```suggestion
             "Updated metadata for supervisor[%s], datasource[%s] from[%s] 
to[%s].",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -849,6 +855,11 @@ public String getType()
   private final SeekableStreamSupervisorTuningConfig tuningConfig;
   private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
   private final String supervisorId;
+
+  /**
+   * Type-verbose id for identifying this supervisor in thread-names, 
listeners, etc.
+  */

Review Comment:
   ```suggestion
     /**
      * Tag for identifying this supervisor in thread-names, listeners, etc. 
tag = (type + supervisorId).
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to