This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new adf42216050 Add follow-ups to multi stream supervisor, single 
datasource ingests (#18149)
adf42216050 is described below

commit adf4221605068f0190c83bf0db76f587744a362b
Author: jtuglu-netflix <[email protected]>
AuthorDate: Tue Jun 17 20:34:00 2025 -0700

    Add follow-ups to multi stream supervisor, single datasource ingests 
(#18149)
    
    - Adds follow-up doc touch-ups and fixes to multi stream supervisor, single 
datasource ingest feature
---
 docs/ingestion/supervisor.md                       |  8 +++----
 .../kafka/supervisor/KafkaSupervisorTest.java      |  2 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  2 +-
 .../actions/SegmentTransactionalAppendAction.java  | 10 +++------
 .../actions/SegmentTransactionalInsertAction.java  |  9 ++------
 .../overlord/supervisor/SupervisorResource.java    | 24 +++++++++-----------
 .../seekablestream/SeekableStreamIndexTask.java    |  4 ++++
 .../SeekableStreamIndexTaskRunner.java             |  7 +-----
 .../supervisor/SeekableStreamSupervisor.java       |  9 ++++----
 .../supervisor/SeekableStreamSupervisorSpec.java   |  4 ++++
 .../supervisor/SupervisorResourceTest.java         | 14 +++++++-----
 .../SeekableStreamIndexTaskRunnerTest.java         |  8 +++----
 .../tests/indexer/AbstractStreamIndexingTest.java  | 12 +++++-----
 .../IndexerMetadataStorageCoordinator.java         | 22 ++++++++++++++++--
 .../IndexerSQLMetadataStorageCoordinator.java      | 26 ++++++----------------
 .../druid/metadata/SQLMetadataConnector.java       |  7 +++---
 .../overlord/supervisor/SupervisorStatusTest.java  |  6 +++--
 17 files changed, 86 insertions(+), 88 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 5db3189c066..b5b38df66dc 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -37,7 +37,7 @@ The following table outlines the high-level configuration 
options for a supervis
 
 |Property|Type|Description|Required|
 |--------|----|-----------|--------|
-|`id`|String|The supervisor id. This should be a unique ID that will identify 
the supervisor.|Yes|
+|`id`|String|The supervisor id. This should be a unique ID that will identify 
the supervisor. If unspecified, defaults to `spec.dataSchema.dataSource`.|No|
 |`type`|String|The supervisor type. For streaming ingestion, this can be 
either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type 
to `autocompact`. |Yes|
 |`spec`|Object|The container object for the supervisor configuration. For 
automatic compaction, this is the same as the compaction configuration. |Yes|
 |`spec.dataSchema`|Object|The schema for the indexing task to use during 
ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for 
more information.|Yes|
@@ -413,9 +413,9 @@ This value is for the ideal situation in which there is at 
most one set of tasks
 In some circumstances, it is possible to have multiple sets of tasks 
publishing simultaneously. This would happen if the
 time-to-publish (generate segment, push to deep storage, load on Historical) 
is greater than `taskDuration`. This is a valid and correct scenario but 
requires additional worker capacity to support. In general, it is a good idea 
to have `taskDuration` be large enough that the previous set of tasks finishes 
publishing before the current set begins.
 
-## Multi-Supervisor Support
-Druid supports multiple stream supervisors ingesting into the same datasource. 
This means you can have any number of the configured stream supervisors (Kafka, 
Kinesis, etc.) ingesting into the same datasource at the same time.
-In order to ensure proper synchronization between ingestion tasks with 
multiple supervisors, it's important to set `useConcurrentLocks=true` in the 
`context` field of the supervisor spec.
+## Multi-Supervisor Support (Experimental)
+Druid supports multiple stream supervisors ingesting into the same datasource. 
This means you can have any number of stream supervisors (Kafka, Kinesis, etc.) 
ingesting into the same datasource at the same time.
+In order to ensure proper synchronization between ingestion tasks with 
multiple supervisors, it's important to set `useConcurrentLocks=true` in the 
`context` field of the supervisor spec. Read more 
[here](concurrent-append-replace.md).
 
 ## Learn more
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index ed7dda03f48..bf07a2dec64 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -4071,7 +4071,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     AlertEvent alert = serviceEmitter.getAlerts().get(0);
     Assert.assertEquals(
-        "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to 
handle notice",
+        "Supervisor[testDS] for datasource[testDS] failed to handle notice",
         alert.getDescription()
     );
     Assert.assertEquals(
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 0181443c2c7..ed20a3f3ec3 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3489,7 +3489,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     final AlertEvent alert = serviceEmitter.getAlerts().get(0);
     Assert.assertEquals(
-        "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to 
handle notice",
+        "Supervisor[testDS] for datasource[testDS] failed to handle notice",
         alert.getDescription()
     );
     Assert.assertEquals(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index b21a0f994c2..3833c4be6b0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.SegmentSchemaMapping;
@@ -104,14 +105,9 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
     } else {
       this.supervisorId = supervisorId;
     }
-
-    if ((startMetadata == null && endMetadata != null)
-        || (startMetadata != null && endMetadata == null)) {
-      throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");
-    } else if (startMetadata != null && supervisorId == null) {
-      throw InvalidInput.exception("supervisorId cannot be null if 
startMetadata and endMetadata are both non-null.");
-    }
     this.segmentSchemaMapping = segmentSchemaMapping;
+
+    IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, 
startMetadata, endMetadata);
   }
 
   @Nullable
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 533e6f4b877..ab84c38cc6a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.Configs;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -33,6 +32,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskLockHelper;
 import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.segment.SegmentSchemaMapping;
@@ -134,12 +134,7 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
     this.supervisorId = Configs.valueOrDefault(supervisorId, dataSource);
     this.segmentSchemaMapping = segmentSchemaMapping;
 
-    if ((startMetadata == null && endMetadata != null)
-        || (startMetadata != null && endMetadata == null)) {
-      throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");
-    } else if (startMetadata != null && supervisorId == null) {
-      throw InvalidInput.exception("supervisorId cannot be null if 
startMetadata and endMetadata are both non-null.");
-    }
+    IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, 
startMetadata, endMetadata);
   }
 
   @JsonProperty
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 51f667e0c3e..c8b0ecff037 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -223,30 +223,28 @@ public class SupervisorResource
                               .withDetailedState(theState.get().toString())
                               .withHealthy(theState.get().isHealthy());
                   }
-                  if (includeFull) {
-                    Optional<SupervisorSpec> theSpec = 
manager.getSupervisorSpec(x);
-                    if (theSpec.isPresent()) {
-                      theBuilder.withSpec(theSpec.get())
-                          
.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null));
+                  Optional<SupervisorSpec> theSpec = 
manager.getSupervisorSpec(x);
+                  if (theSpec.isPresent()) {
+                    final SupervisorSpec spec = theSpec.get();
+                    
theBuilder.withDataSource(spec.getDataSources().stream().findFirst().orElse(null));
+                    if (includeFull) {
+                      theBuilder.withSpec(spec);
                     }
-                  }
-                  if (includeSystem) {
-                    Optional<SupervisorSpec> theSpec = 
manager.getSupervisorSpec(x);
-                    if (theSpec.isPresent()) {
+                    if (includeSystem) {
                       try {
                         // serializing SupervisorSpec here, so that callers of 
`druid/indexer/v1/supervisor?system`
                         // which are outside the overlord process can 
deserialize the response and get a json
                         // payload of SupervisorSpec object when they don't 
have guice bindings for all the fields
                         // for example, broker does not have bindings for all 
fields of `KafkaSupervisorSpec` or
                         // `KinesisSupervisorSpec`
-                        
theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get()));
+                        
theBuilder.withSpecString(objectMapper.writeValueAsString(spec));
                       }
                       catch (JsonProcessingException e) {
                         throw new RuntimeException(e);
                       }
-                      
theBuilder.withType(manager.getSupervisorSpec(x).get().getType())
-                                
.withSource(manager.getSupervisorSpec(x).get().getSource())
-                                
.withSuspended(manager.getSupervisorSpec(x).get().isSuspended());
+                      theBuilder.withType(spec.getType())
+                                .withSource(spec.getSource())
+                                .withSuspended(spec.isSuspended());
                     }
                   }
                   return theBuilder.build();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 72f3d321c94..04f44b716e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -130,6 +130,10 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     return dataSchema;
   }
 
+  /**
+   * Returns the supervisor ID of the supervisor this task belongs to.
+   * If null/unspecified, this defaults to the datasource name.
+   */
   @JsonProperty
   public String getSupervisorId()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index bed663b7a1e..15ec70df08c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -318,12 +318,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   */
   public String getSupervisorId()
   {
-    @Nullable
-    final String supervisorId = task.getSupervisorId();
-    if (supervisorId != null) {
-      return supervisorId;
-    }
-    return task.getDataSource();
+    return task.getSupervisorId();
   }
 
   @VisibleForTesting
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 191edf4a735..01acc9de306 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -857,7 +857,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final String supervisorId;
 
   /**
-   * Type-verbose id for identifying this supervisor in thread-names, 
listeners, etc.
+   * Tag for identifying this supervisor in thread-names, listeners, etc. tag 
= (type + supervisorId).
   */
   private final String supervisorTag;
   private final TaskInfoProvider taskInfoProvider;
@@ -1029,8 +1029,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       catch (Exception e) {
         if (!started) {
           log.warn(
-              "First initialization attempt failed for 
SeekableStreamSupervisor[%s], starting retries...",
-              supervisorId
+              "First initialization attempt failed for supervisor[%s], 
dataSource[%s], starting retries...",
+              supervisorId,
+              dataSource
           );
 
           exec.submit(
@@ -1264,7 +1265,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   }
                   catch (Throwable e) {
                     stateManager.recordThrowableEvent(e);
-                    log.makeAlert(e, "SeekableStreamSupervisor[%s] for 
datasource=[%s] failed to handle notice", supervisorId, dataSource)
+                    log.makeAlert(e, "Supervisor[%s] for datasource[%s] failed 
to handle notice", supervisorId, dataSource)
                        .addData("noticeClass", 
notice.getClass().getSimpleName())
                        .emit();
                   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index f09e3bb9e8c..967652673f7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -162,6 +162,10 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
     return emitter;
   }
 
+  /**
+   * Returns the identifier for this supervisor.
+   * If unspecified, defaults to the dataSource being written to.
+   */
   @Override
   @JsonProperty
   public String getId()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index febc959b0d1..49101b34b1b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -351,8 +351,8 @@ public class SupervisorResourceTest extends EasyMockSupport
     Assert.assertTrue(
         specs.stream()
              .allMatch(spec ->
-                           ("id1".equals(spec.getId()) && 
SPEC1.equals(spec.getSpec())) ||
-                           ("id2".equals(spec.getId()) && 
SPEC2.equals(spec.getSpec()))
+                           ("id1".equals(spec.getId()) && 
spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) ||
+                           ("id2".equals(spec.getId()) && 
spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec()))
              )
     );
   }
@@ -398,8 +398,8 @@ public class SupervisorResourceTest extends EasyMockSupport
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
     
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce();
-    
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(1);
-    
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(1);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2);
     
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
     
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
     setupMockRequest();
@@ -417,11 +417,13 @@ public class SupervisorResourceTest extends 
EasyMockSupport
                 if ("id1".equals(id)) {
                   return state1.toString().equals(state.getState())
                          && state1.toString().equals(state.getDetailedState())
-                         && (Boolean) state.isHealthy() == state1.isHealthy();
+                         && (Boolean) state.isHealthy() == state1.isHealthy()
+                         && state.getDataSource().equals("datasource1");
                 } else if ("id2".equals(id)) {
                   return state2.toString().equals(state.getState())
                          && state2.toString().equals(state.getDetailedState())
-                         && (Boolean) state.isHealthy() == state2.isHealthy();
+                         && (Boolean) state.isHealthy() == state2.isHealthy()
+                         && state.getDataSource().equals("datasource2");
                 }
                 return false;
               })
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index cdfe1fa6f01..7518bb7f172 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -167,7 +167,7 @@ public class SeekableStreamIndexTaskRunnerTest
   }
 
   @Test
-  public void testIfSupervisorIdIsNullThenUsesDatasource()
+  public void testGetSupervisorId()
   {
     DimensionsSpec dimensionsSpec = new DimensionsSpec(
         Arrays.asList(
@@ -202,12 +202,10 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
 
-    // Return null supervisorId
-    Mockito.when(task.getSupervisorId()).thenReturn(null);
-    Mockito.when(task.getDataSource()).thenReturn("dataSource");
+    Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
     TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null,
                                                                                
                LockGranularity.TIME_CHUNK);
-    Assert.assertEquals("dataSource", runner.getSupervisorId());
+    Assert.assertEquals("supervisorId", runner.getSupervisorId());
   }
 
   static class TestasbleSeekableStreamIndexTaskRunner extends 
SeekableStreamIndexTaskRunner
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index f4628707a21..7954cac21a9 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -73,7 +73,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   static final int TOTAL_NUMBER_OF_SECOND = 10;
 
   private static final Logger LOG = new 
Logger(AbstractStreamIndexingTest.class);
-  // Since this integration test can terminates or be killed un-expectedly, 
this tag is added to all streams created
+  // Since this integration test can be terminated or be killed un-expectedly, 
this tag is added to all streams created
   // to help make stream clean up easier. (Normally, streams should be cleanup 
automattically by the teardown method)
   // The value to this tag is a timestamp that can be used by a lambda 
function to remove unused stream.
   private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
@@ -998,14 +998,12 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
       }
 
       for (GeneratedTestConfig testConfig : testConfigs) {
-        ITRetryUtil.retryUntil(
-            () -> SupervisorStateManager.BasicState.RUNNING.equals(
-                indexer.getSupervisorStatus(testConfig.getSupervisorId())
-            ),
-            true,
+        ITRetryUtil.retryUntilEquals(
+            () -> indexer.getSupervisorStatus(testConfig.getSupervisorId()),
+            SupervisorStateManager.BasicState.RUNNING,
             10_000,
             30,
-            "Waiting for supervisor [" + testConfig.getSupervisorId() + "] to 
be running"
+            "State of supervisor[" + testConfig.getSupervisorId() + "]"
         );
 
         ITRetryUtil.retryUntil(
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 0e818bd7a17..c577bd3af14 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord;
 
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.ReplaceTaskLock;
@@ -315,8 +316,8 @@ public interface IndexerMetadataStorageCoordinator
    * If segmentsToDrop is not null and not empty, this insertion will be 
atomic with a insert-and-drop on inserting
    * {@param segments} and dropping {@param segmentsToDrop}.
    *
-   * @param supervisorId   supervisorID which is committing the segments. 
Cannot be null if `startMetadata`
-   *                       and endMetadata` are both non-null.
+   * @param supervisorId   supervisorID which is committing the segments. 
Cannot be null if {@code startMetadata}
+   *                       and {@code endMetadata} are both non-null.
    * @param segments       set of segments to add, must all be from the same 
dataSource
    * @param startMetadata  dataSource metadata pre-insert must match this 
startMetadata according to
    *                       {@link 
DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
@@ -632,4 +633,21 @@ public interface IndexerMetadataStorageCoordinator
    */
   boolean markSegmentAsUsed(SegmentId segmentId);
 
+  /**
+   * Validates the given supervisorId and given metadata to ensure
+   * that start/end metadata non-null implies supervisor ID is non-null.
+   */
+  static void validateDataSourceMetadata(
+      @Nullable final String supervisorId,
+      @Nullable final DataSourceMetadata startMetadata,
+      @Nullable final DataSourceMetadata endMetadata
+  )
+  {
+    if ((startMetadata == null && endMetadata != null) || (startMetadata != 
null && endMetadata == null)) {
+      throw InvalidInput.exception("'startMetadata' and 'endMetadata' must 
either both be null or both non-null");
+    } else if (startMetadata != null && supervisorId == null) {
+      throw InvalidInput.exception(
+          "'supervisorId' cannot be null if 'startMetadata' and 'endMetadata' 
are both non-null.");
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 2d7d9ced976..5f6f817313e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -433,14 +433,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   )
   {
     verifySegmentsToCommit(segments);
-
-    if ((startMetadata == null && endMetadata != null) || (startMetadata != 
null && endMetadata == null)) {
-      throw new IllegalArgumentException("start/end metadata pair must be 
either null or non-null");
-    } else if (startMetadata != null && supervisorId == null) {
-      throw new IllegalArgumentException(
-          "supervisorId cannot be null if startMetadata and endMetadata are 
both non-null.");
-    }
-
+    IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, 
startMetadata, endMetadata);
     final String dataSource = segments.iterator().next().getDataSource();
 
     try {
@@ -1189,13 +1182,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   )
   {
     final String dataSource = verifySegmentsToCommit(appendSegments);
-    if ((startMetadata == null && endMetadata != null)
-        || (startMetadata != null && endMetadata == null)) {
-      throw new IllegalArgumentException("start/end metadata pair must be 
either null or non-null");
-    } else if (startMetadata != null && supervisorId == null) {
-      throw new IllegalArgumentException(
-          "supervisorId cannot be null if startMetadata and endMetadata are 
both non-null.");
-    }
+    IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, 
startMetadata, endMetadata);
 
     final List<PendingSegmentRecord> segmentIdsForNewVersions = 
inReadOnlyDatasourceTransaction(
         dataSource,
@@ -1237,8 +1224,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           transaction -> {
             // Try to update datasource metadata first
             if (startMetadata != null) {
-              final SegmentPublishResult metadataUpdateResult
-                  = updateDataSourceMetadataInTransaction(
+              final SegmentPublishResult metadataUpdateResult = 
updateDataSourceMetadataInTransaction(
                   transaction,
                   supervisorId,
                   dataSource,
@@ -2158,6 +2144,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
    * {@link DataSourceMetadata#matches matches} the {@code endMetadata}, this
    * method returns immediately with success.
    *
+   * @param supervisorId The supervisor ID. Used as the PK for the 
corresponding metadata entry in the DB.
+   * @param dataSource The dataSource. Currently used only for logging 
purposes.
    * @param startMetadata Current entry in the DB must
    *                      {@link DataSourceMetadata#matches match} this value.
    * @param endMetadata   The updated entry will be equal to the current entry
@@ -2289,12 +2277,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     if (publishResult.isSuccess()) {
       log.info(
-          "Updated metadata for supervisor[%s] for datasource[%s] from[%s] 
to[%s].",
+          "Updated metadata for supervisor[%s], datasource[%s] from[%s] 
to[%s].",
           supervisorId, dataSource, oldCommitMetadataFromDb, newCommitMetadata
       );
     } else {
       log.info(
-          "Failed to update metadata for supervisor[%s] for datasource[%s] due 
to reason[%s].",
+          "Failed to update metadata for supervisor[%s], datasource[%s] due to 
reason[%s].",
           supervisorId, dataSource, publishResult.getErrorMsg()
       );
     }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 6d6926ffaac..42b13ab11c8 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -312,10 +312,9 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
   }
 
   /**
-   * Creates the table for storing datasource metadata for supervisors.
-   * Due to backwards compatibility reasons, the `dataSource` column will 
always uniquely identify a supervisor.
-   * For certain types of supervisors which support N:1 supervisor:datasource 
relationship, the `dataSource` column will store the supervisor ID.
-   * Otherwise, it will store the legacy supervisor ID – the `dataSource` 
itself.
+   * The {@code dataSource} column stores the supervisor ID.
+   * It has not been renamed to retain backwards compatibility.
+   * Supervisors created without an explicit supervisor id default to using 
the datasource name.
    */
   public void createDataSourceTable(final String tableName)
   {
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
index 4c370453a0b..f8ef902c2fa 100644
--- 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
@@ -50,7 +50,8 @@ public class SupervisorStatusTest
   public void testJsonAttr() throws IOException
   {
     String json = "{"
-                  + "\"id\":\"wikipedia\","
+                  + "\"id\":\"wikipedia_supervisor\","
+                  + "\"dataSource\":\"wikipedia\","
                   + "\"state\":\"UNHEALTHY_SUPERVISOR\","
                   + "\"detailedState\":\"UNHEALTHY_SUPERVISOR\","
                   + "\"healthy\":false,"
@@ -61,7 +62,8 @@ public class SupervisorStatusTest
     final ObjectMapper mapper = new ObjectMapper();
     final SupervisorStatus deserialized = mapper.readValue(json, 
SupervisorStatus.class);
     Assert.assertNotNull(deserialized);
-    Assert.assertEquals("wikipedia", deserialized.getId());
+    Assert.assertEquals("wikipedia_supervisor", deserialized.getId());
+    Assert.assertEquals("wikipedia", deserialized.getDataSource());
     final String serialized = mapper.writeValueAsString(deserialized);
     Assert.assertTrue(serialized.contains("\"source\""));
     Assert.assertEquals(json, serialized);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to