This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new adf42216050 Add follow-ups to multi stream supervisor, single
datasource ingests (#18149)
adf42216050 is described below
commit adf4221605068f0190c83bf0db76f587744a362b
Author: jtuglu-netflix <[email protected]>
AuthorDate: Tue Jun 17 20:34:00 2025 -0700
Add follow-ups to multi stream supervisor, single datasource ingests
(#18149)
- Adds follow-up doc touch-ups and fixes to multi stream supervisor, single
datasource ingest feature
---
docs/ingestion/supervisor.md | 8 +++----
.../kafka/supervisor/KafkaSupervisorTest.java | 2 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 2 +-
.../actions/SegmentTransactionalAppendAction.java | 10 +++------
.../actions/SegmentTransactionalInsertAction.java | 9 ++------
.../overlord/supervisor/SupervisorResource.java | 24 +++++++++-----------
.../seekablestream/SeekableStreamIndexTask.java | 4 ++++
.../SeekableStreamIndexTaskRunner.java | 7 +-----
.../supervisor/SeekableStreamSupervisor.java | 9 ++++----
.../supervisor/SeekableStreamSupervisorSpec.java | 4 ++++
.../supervisor/SupervisorResourceTest.java | 14 +++++++-----
.../SeekableStreamIndexTaskRunnerTest.java | 8 +++----
.../tests/indexer/AbstractStreamIndexingTest.java | 12 +++++-----
.../IndexerMetadataStorageCoordinator.java | 22 ++++++++++++++++--
.../IndexerSQLMetadataStorageCoordinator.java | 26 ++++++----------------
.../druid/metadata/SQLMetadataConnector.java | 7 +++---
.../overlord/supervisor/SupervisorStatusTest.java | 6 +++--
17 files changed, 86 insertions(+), 88 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 5db3189c066..b5b38df66dc 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -37,7 +37,7 @@ The following table outlines the high-level configuration
options for a supervis
|Property|Type|Description|Required|
|--------|----|-----------|--------|
-|`id`|String|The supervisor id. This should be a unique ID that will identify
the supervisor.|Yes|
+|`id`|String|The supervisor id. This should be a unique ID that will identify
the supervisor. If unspecified, defaults to `spec.dataSchema.dataSource`.|No|
|`type`|String|The supervisor type. For streaming ingestion, this can be
either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type
to `autocompact`. |Yes|
|`spec`|Object|The container object for the supervisor configuration. For
automatic compaction, this is the same as the compaction configuration. |Yes|
|`spec.dataSchema`|Object|The schema for the indexing task to use during
ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for
more information.|Yes|
@@ -413,9 +413,9 @@ This value is for the ideal situation in which there is at
most one set of tasks
In some circumstances, it is possible to have multiple sets of tasks
publishing simultaneously. This would happen if the
time-to-publish (generate segment, push to deep storage, load on Historical)
is greater than `taskDuration`. This is a valid and correct scenario but
requires additional worker capacity to support. In general, it is a good idea
to have `taskDuration` be large enough that the previous set of tasks finishes
publishing before the current set begins.
-## Multi-Supervisor Support
-Druid supports multiple stream supervisors ingesting into the same datasource.
This means you can have any number of the configured stream supervisors (Kafka,
Kinesis, etc.) ingesting into the same datasource at the same time.
-In order to ensure proper synchronization between ingestion tasks with
multiple supervisors, it's important to set `useConcurrentLocks=true` in the
`context` field of the supervisor spec.
+## Multi-Supervisor Support (Experimental)
+Druid supports multiple stream supervisors ingesting into the same datasource.
This means you can have any number of stream supervisors (Kafka, Kinesis, etc.)
ingesting into the same datasource at the same time.
+In order to ensure proper synchronization between ingestion tasks with
multiple supervisors, it's important to set `useConcurrentLocks=true` in the
`context` field of the supervisor spec. Read more
[here](concurrent-append-replace.md).
## Learn more
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index ed7dda03f48..bf07a2dec64 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -4071,7 +4071,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
- "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to
handle notice",
+ "Supervisor[testDS] for datasource[testDS] failed to handle notice",
alert.getDescription()
);
Assert.assertEquals(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 0181443c2c7..ed20a3f3ec3 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3489,7 +3489,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
- "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to
handle notice",
+ "Supervisor[testDS] for datasource[testDS] failed to handle notice",
alert.getDescription()
);
Assert.assertEquals(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index b21a0f994c2..3833c4be6b0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -31,6 +31,7 @@ import
org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
@@ -104,14 +105,9 @@ public class SegmentTransactionalAppendAction implements
TaskAction<SegmentPubli
} else {
this.supervisorId = supervisorId;
}
-
- if ((startMetadata == null && endMetadata != null)
- || (startMetadata != null && endMetadata == null)) {
- throw InvalidInput.exception("startMetadata and endMetadata must either
be both null or both non-null.");
- } else if (startMetadata != null && supervisorId == null) {
- throw InvalidInput.exception("supervisorId cannot be null if
startMetadata and endMetadata are both non-null.");
- }
this.segmentSchemaMapping = segmentSchemaMapping;
+
+ IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId,
startMetadata, endMetadata);
}
@Nullable
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 533e6f4b877..ab84c38cc6a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.Configs;
-import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -33,6 +32,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskLockHelper;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.SegmentSchemaMapping;
@@ -134,12 +134,7 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
this.supervisorId = Configs.valueOrDefault(supervisorId, dataSource);
this.segmentSchemaMapping = segmentSchemaMapping;
- if ((startMetadata == null && endMetadata != null)
- || (startMetadata != null && endMetadata == null)) {
- throw InvalidInput.exception("startMetadata and endMetadata must either
be both null or both non-null.");
- } else if (startMetadata != null && supervisorId == null) {
- throw InvalidInput.exception("supervisorId cannot be null if
startMetadata and endMetadata are both non-null.");
- }
+ IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId,
startMetadata, endMetadata);
}
@JsonProperty
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 51f667e0c3e..c8b0ecff037 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -223,30 +223,28 @@ public class SupervisorResource
.withDetailedState(theState.get().toString())
.withHealthy(theState.get().isHealthy());
}
- if (includeFull) {
- Optional<SupervisorSpec> theSpec =
manager.getSupervisorSpec(x);
- if (theSpec.isPresent()) {
- theBuilder.withSpec(theSpec.get())
-
.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null));
+ Optional<SupervisorSpec> theSpec =
manager.getSupervisorSpec(x);
+ if (theSpec.isPresent()) {
+ final SupervisorSpec spec = theSpec.get();
+
theBuilder.withDataSource(spec.getDataSources().stream().findFirst().orElse(null));
+ if (includeFull) {
+ theBuilder.withSpec(spec);
}
- }
- if (includeSystem) {
- Optional<SupervisorSpec> theSpec =
manager.getSupervisorSpec(x);
- if (theSpec.isPresent()) {
+ if (includeSystem) {
try {
// serializing SupervisorSpec here, so that callers of
`druid/indexer/v1/supervisor?system`
// which are outside the overlord process can
deserialize the response and get a json
// payload of SupervisorSpec object when they don't
have guice bindings for all the fields
// for example, broker does not have bindings for all
fields of `KafkaSupervisorSpec` or
// `KinesisSupervisorSpec`
-
theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get()));
+
theBuilder.withSpecString(objectMapper.writeValueAsString(spec));
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
-
theBuilder.withType(manager.getSupervisorSpec(x).get().getType())
-
.withSource(manager.getSupervisorSpec(x).get().getSource())
-
.withSuspended(manager.getSupervisorSpec(x).get().isSuspended());
+ theBuilder.withType(spec.getType())
+ .withSource(spec.getSource())
+ .withSuspended(spec.isSuspended());
}
}
return theBuilder.build();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 72f3d321c94..04f44b716e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -130,6 +130,10 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
return dataSchema;
}
+ /**
+ * Returns the supervisor ID of the supervisor this task belongs to.
+ * If null/unspecified, this defaults to the datasource name.
+ */
@JsonProperty
public String getSupervisorId()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index bed663b7a1e..15ec70df08c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -318,12 +318,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
*/
public String getSupervisorId()
{
- @Nullable
- final String supervisorId = task.getSupervisorId();
- if (supervisorId != null) {
- return supervisorId;
- }
- return task.getDataSource();
+ return task.getSupervisorId();
}
@VisibleForTesting
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 191edf4a735..01acc9de306 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -857,7 +857,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final String supervisorId;
/**
- * Type-verbose id for identifying this supervisor in thread-names,
listeners, etc.
+ * Tag for identifying this supervisor in thread-names, listeners, etc. tag
= (type + supervisorId).
*/
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
@@ -1029,8 +1029,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
catch (Exception e) {
if (!started) {
log.warn(
- "First initialization attempt failed for
SeekableStreamSupervisor[%s], starting retries...",
- supervisorId
+ "First initialization attempt failed for supervisor[%s],
dataSource[%s], starting retries...",
+ supervisorId,
+ dataSource
);
exec.submit(
@@ -1264,7 +1265,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
- log.makeAlert(e, "SeekableStreamSupervisor[%s] for
datasource=[%s] failed to handle notice", supervisorId, dataSource)
+ log.makeAlert(e, "Supervisor[%s] for datasource[%s] failed
to handle notice", supervisorId, dataSource)
.addData("noticeClass",
notice.getClass().getSimpleName())
.emit();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index f09e3bb9e8c..967652673f7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -162,6 +162,10 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
return emitter;
}
+ /**
+ * Returns the identifier for this supervisor.
+ * If unspecified, defaults to the dataSource being written to.
+ */
@Override
@JsonProperty
public String getId()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index febc959b0d1..49101b34b1b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -351,8 +351,8 @@ public class SupervisorResourceTest extends EasyMockSupport
Assert.assertTrue(
specs.stream()
.allMatch(spec ->
- ("id1".equals(spec.getId()) &&
SPEC1.equals(spec.getSpec())) ||
- ("id2".equals(spec.getId()) &&
SPEC2.equals(spec.getSpec()))
+ ("id1".equals(spec.getId()) &&
spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) ||
+ ("id2".equals(spec.getId()) &&
spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec()))
)
);
}
@@ -398,8 +398,8 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce();
-
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(1);
-
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(1);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2);
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
setupMockRequest();
@@ -417,11 +417,13 @@ public class SupervisorResourceTest extends
EasyMockSupport
if ("id1".equals(id)) {
return state1.toString().equals(state.getState())
&& state1.toString().equals(state.getDetailedState())
- && (Boolean) state.isHealthy() == state1.isHealthy();
+ && (Boolean) state.isHealthy() == state1.isHealthy()
+ && state.getDataSource().equals("datasource1");
} else if ("id2".equals(id)) {
return state2.toString().equals(state.getState())
&& state2.toString().equals(state.getDetailedState())
- && (Boolean) state.isHealthy() == state2.isHealthy();
+ && (Boolean) state.isHealthy() == state2.isHealthy()
+ && state.getDataSource().equals("datasource2");
}
return false;
})
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index cdfe1fa6f01..7518bb7f172 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -167,7 +167,7 @@ public class SeekableStreamIndexTaskRunnerTest
}
@Test
- public void testIfSupervisorIdIsNullThenUsesDatasource()
+ public void testGetSupervisorId()
{
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
@@ -202,12 +202,10 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- // Return null supervisorId
- Mockito.when(task.getSupervisorId()).thenReturn(null);
- Mockito.when(task.getDataSource()).thenReturn("dataSource");
+ Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null,
LockGranularity.TIME_CHUNK);
- Assert.assertEquals("dataSource", runner.getSupervisorId());
+ Assert.assertEquals("supervisorId", runner.getSupervisorId());
}
static class TestasbleSeekableStreamIndexTaskRunner extends
SeekableStreamIndexTaskRunner
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index f4628707a21..7954cac21a9 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -73,7 +73,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
static final int TOTAL_NUMBER_OF_SECOND = 10;
private static final Logger LOG = new
Logger(AbstractStreamIndexingTest.class);
- // Since this integration test can terminates or be killed un-expectedly,
this tag is added to all streams created
+ // Since this integration test can be terminated or be killed un-expectedly,
this tag is added to all streams created
// to help make stream clean up easier. (Normally, streams should be cleanup
automattically by the teardown method)
// The value to this tag is a timestamp that can be used by a lambda
function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
@@ -998,14 +998,12 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
for (GeneratedTestConfig testConfig : testConfigs) {
- ITRetryUtil.retryUntil(
- () -> SupervisorStateManager.BasicState.RUNNING.equals(
- indexer.getSupervisorStatus(testConfig.getSupervisorId())
- ),
- true,
+ ITRetryUtil.retryUntilEquals(
+ () -> indexer.getSupervisorStatus(testConfig.getSupervisorId()),
+ SupervisorStateManager.BasicState.RUNNING,
10_000,
30,
- "Waiting for supervisor [" + testConfig.getSupervisorId() + "] to
be running"
+ "State of supervisor[" + testConfig.getSupervisorId() + "]"
);
ITRetryUtil.retryUntil(
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 0e818bd7a17..c577bd3af14 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
@@ -315,8 +316,8 @@ public interface IndexerMetadataStorageCoordinator
* If segmentsToDrop is not null and not empty, this insertion will be
atomic with a insert-and-drop on inserting
* {@param segments} and dropping {@param segmentsToDrop}.
*
- * @param supervisorId supervisorID which is committing the segments.
Cannot be null if `startMetadata`
- * and endMetadata` are both non-null.
+ * @param supervisorId supervisorID which is committing the segments.
Cannot be null if {@code startMetadata}
+ * and {@code endMetadata} are both non-null.
* @param segments set of segments to add, must all be from the same
dataSource
* @param startMetadata dataSource metadata pre-insert must match this
startMetadata according to
* {@link
DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
@@ -632,4 +633,21 @@ public interface IndexerMetadataStorageCoordinator
*/
boolean markSegmentAsUsed(SegmentId segmentId);
+ /**
+ * Validates the given supervisorId and given metadata to ensure
+ * that start/end metadata non-null implies supervisor ID is non-null.
+ */
+ static void validateDataSourceMetadata(
+ @Nullable final String supervisorId,
+ @Nullable final DataSourceMetadata startMetadata,
+ @Nullable final DataSourceMetadata endMetadata
+ )
+ {
+ if ((startMetadata == null && endMetadata != null) || (startMetadata !=
null && endMetadata == null)) {
+ throw InvalidInput.exception("'startMetadata' and 'endMetadata' must
either both be null or both non-null");
+ } else if (startMetadata != null && supervisorId == null) {
+ throw InvalidInput.exception(
+ "'supervisorId' cannot be null if 'startMetadata' and 'endMetadata'
are both non-null.");
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 2d7d9ced976..5f6f817313e 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -433,14 +433,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
)
{
verifySegmentsToCommit(segments);
-
- if ((startMetadata == null && endMetadata != null) || (startMetadata !=
null && endMetadata == null)) {
- throw new IllegalArgumentException("start/end metadata pair must be
either null or non-null");
- } else if (startMetadata != null && supervisorId == null) {
- throw new IllegalArgumentException(
- "supervisorId cannot be null if startMetadata and endMetadata are
both non-null.");
- }
-
+ IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId,
startMetadata, endMetadata);
final String dataSource = segments.iterator().next().getDataSource();
try {
@@ -1189,13 +1182,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
)
{
final String dataSource = verifySegmentsToCommit(appendSegments);
- if ((startMetadata == null && endMetadata != null)
- || (startMetadata != null && endMetadata == null)) {
- throw new IllegalArgumentException("start/end metadata pair must be
either null or non-null");
- } else if (startMetadata != null && supervisorId == null) {
- throw new IllegalArgumentException(
- "supervisorId cannot be null if startMetadata and endMetadata are
both non-null.");
- }
+ IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId,
startMetadata, endMetadata);
final List<PendingSegmentRecord> segmentIdsForNewVersions =
inReadOnlyDatasourceTransaction(
dataSource,
@@ -1237,8 +1224,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
transaction -> {
// Try to update datasource metadata first
if (startMetadata != null) {
- final SegmentPublishResult metadataUpdateResult
- = updateDataSourceMetadataInTransaction(
+ final SegmentPublishResult metadataUpdateResult =
updateDataSourceMetadataInTransaction(
transaction,
supervisorId,
dataSource,
@@ -2158,6 +2144,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
* {@link DataSourceMetadata#matches matches} the {@code endMetadata}, this
* method returns immediately with success.
*
+ * @param supervisorId The supervisor ID. Used as the PK for the
corresponding metadata entry in the DB.
+ * @param dataSource The dataSource. Currently used only for logging
purposes.
* @param startMetadata Current entry in the DB must
* {@link DataSourceMetadata#matches match} this value.
* @param endMetadata The updated entry will be equal to the current entry
@@ -2289,12 +2277,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
if (publishResult.isSuccess()) {
log.info(
- "Updated metadata for supervisor[%s] for datasource[%s] from[%s]
to[%s].",
+ "Updated metadata for supervisor[%s], datasource[%s] from[%s]
to[%s].",
supervisorId, dataSource, oldCommitMetadataFromDb, newCommitMetadata
);
} else {
log.info(
- "Failed to update metadata for supervisor[%s] for datasource[%s] due
to reason[%s].",
+ "Failed to update metadata for supervisor[%s], datasource[%s] due to
reason[%s].",
supervisorId, dataSource, publishResult.getErrorMsg()
);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 6d6926ffaac..42b13ab11c8 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -312,10 +312,9 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
}
/**
- * Creates the table for storing datasource metadata for supervisors.
- * Due to backwards compatibility reasons, the `dataSource` column will
always uniquely identify a supervisor.
- * For certain types of supervisors which support N:1 supervisor:datasource
relationship, the `dataSource` column will store the supervisor ID.
- * Otherwise, it will store the legacy supervisor ID – the `dataSource`
itself.
+ * The {@code dataSource} column stores the supervisor ID.
+ * It has not been renamed to retain backwards compatibility.
+ * Supervisors created without an explicit supervisor id default to using
the datasource name.
*/
public void createDataSourceTable(final String tableName)
{
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
index 4c370453a0b..f8ef902c2fa 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java
@@ -50,7 +50,8 @@ public class SupervisorStatusTest
public void testJsonAttr() throws IOException
{
String json = "{"
- + "\"id\":\"wikipedia\","
+ + "\"id\":\"wikipedia_supervisor\","
+ + "\"dataSource\":\"wikipedia\","
+ "\"state\":\"UNHEALTHY_SUPERVISOR\","
+ "\"detailedState\":\"UNHEALTHY_SUPERVISOR\","
+ "\"healthy\":false,"
@@ -61,7 +62,8 @@ public class SupervisorStatusTest
final ObjectMapper mapper = new ObjectMapper();
final SupervisorStatus deserialized = mapper.readValue(json,
SupervisorStatus.class);
Assert.assertNotNull(deserialized);
- Assert.assertEquals("wikipedia", deserialized.getId());
+ Assert.assertEquals("wikipedia_supervisor", deserialized.getId());
+ Assert.assertEquals("wikipedia", deserialized.getDataSource());
final String serialized = mapper.writeValueAsString(deserialized);
Assert.assertTrue(serialized.contains("\"source\""));
Assert.assertEquals(json, serialized);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]