abhishekrb19 commented on code in PR #18082:
URL: https://github.com/apache/druid/pull/18082#discussion_r2152613361
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1018,7 +1030,7 @@ public void start()
if (!started) {
log.warn(
"First initialization attempt failed for
SeekableStreamSupervisor[%s], starting retries...",
- dataSource
+ supervisorId
Review Comment:
Same here, please include `dataSource` in the warn log as well.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2147,17 +2171,19 @@ private Map<String, String>
getAppendSegmentsCommittedDuringTask(
*/
protected SegmentPublishResult updateDataSourceMetadataInTransaction(
final SegmentMetadataTransaction transaction,
+ final String supervisorId,
Review Comment:
javadoc missing for new parameter
##########
docs/ingestion/supervisor.md:
##########
@@ -37,11 +37,13 @@ The following table outlines the high-level configuration
options for a supervis
|Property|Type|Description|Required|
|--------|----|-----------|--------|
+|`id`|String|The supervisor id. This should be a unique ID that will identify
the supervisor.|Yes|
|`type`|String|The supervisor type. For streaming ingestion, this can be
either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type
to `autocompact`. |Yes|
Review Comment:
Does this also require a web-console change to populate the new field in the
spec?
Also, related, I think it would be good to have a dedicated Supervisor ID
column, instead of `Supervisor ID (datasource)`, which is slightly misleading.
This would also be useful for compaction and scheduled batch supervisors, which
can also have a many-to-one relationship from supervisor to datasource.

cc: @vogievetsky
##########
docs/ingestion/supervisor.md:
##########
@@ -37,11 +37,13 @@ The following table outlines the high-level configuration
options for a supervis
|Property|Type|Description|Required|
|--------|----|-----------|--------|
+|`id`|String|The supervisor id. This should be a unique ID that will identify
the supervisor.|Yes|
|`type`|String|The supervisor type. For streaming ingestion, this can be
either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type
to `autocompact`. |Yes|
|`spec`|Object|The container object for the supervisor configuration. For
automatic compaction, this is the same as the compaction configuration. |Yes|
Review Comment:
One more thing I noticed when I click on the "Spec" in the web-console, it
doesn’t show the `id`, even though the backend appears to return `"id": "one"`
in the "Raw" spec. Perhaps a web-console change is needed here? The downloaded
spec appears to be based on the formatted spec and not the raw one, so this can
potentially cause some confusion. Please see screenshot below:

##########
docs/querying/sql-metadata-tables.md:
##########
@@ -299,6 +299,7 @@ The supervisors table provides information about
supervisors.
|Column|Type|Notes|
|------|-----|-----|
|supervisor_id|VARCHAR|Supervisor task identifier|
+|datasource|VARCHAR|Datasource the supervisor operates on|
Review Comment:
I just tried querying this table and noticed that this new column
`datasource` is always null:

##########
docs/ingestion/supervisor.md:
##########
@@ -37,11 +37,13 @@ The following table outlines the high-level configuration
options for a supervis
|Property|Type|Description|Required|
|--------|----|-----------|--------|
+|`id`|String|The supervisor id. This should be a unique ID that will identify
the supervisor.|Yes|
Review Comment:
Is `id` actually required now for creating a supervisor? Looks like we
default to the `datasource` name for compatibility reasons?
##########
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java:
##########
@@ -100,6 +100,7 @@
@RunWith(Parameterized.class)
public class IndexerSQLMetadataStorageCoordinatorTest extends
IndexerSqlMetadataStorageCoordinatorTestBase
{
+ private static final String SUPERVISOR_ID = "supervisor";
Review Comment:
`supervisorId` could be null in some flows, right?
`commitSegmentsAndMetadata` and a few other methods have nullable
`supervisorId`, so I think it would be good to have some tests that cover these
cases to catch any bugs and also for backward compatibility. Either
parameterizing `SUPERVISOR_ID` or just adding a few tests with a null
supervisor ID should be fine (instead of always passing a non-null id).
##########
docs/ingestion/supervisor.md:
##########
@@ -411,6 +413,10 @@ This value is for the ideal situation in which there is at
most one set of tasks
In some circumstances, it is possible to have multiple sets of tasks
publishing simultaneously. This would happen if the
time-to-publish (generate segment, push to deep storage, load on Historical)
is greater than `taskDuration`. This is a valid and correct scenario but
requires additional worker capacity to support. In general, it is a good idea
to have `taskDuration` be large enough that the previous set of tasks finishes
publishing before the current set begins.
+## Multi-Supervisor Support
+Druid supports multiple stream supervisors ingesting into the same datasource.
This means you can have any number of the configured stream supervisors (Kafka,
Kinesis, etc.) ingesting into the same datasource at the same time.
Review Comment:
Should we also link this topic to `multiTopic`? I'm also wondering with
support for multiple supervisors, would anyone still prefer to use
`multiTopic`, even if all the input topics are on the same Kafka brokers?
`multiTopic` is limited to Kafka and only works when all input topics reside
on the same Kafka brokers. Multiple supervisors on the other hand has other
added benefits in relation to scaling and mangement. If there's value in
keeping both, we could call out the differences. Otherwise, we could consider
deprecating `multiTopic` in favor of this new feature once it graduates from
experimental to GA.
Thoughts, @jtuglu-netflix @kfaraz?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -453,24 +453,26 @@ public void handle()
if (spec.isSuspended()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution because [%s]
supervisor is suspended",
- dataSource
+ supervisorId
);
return;
}
if (SupervisorStateManager.BasicState.IDLE == getState()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution because [%s]
supervisor is idle",
- dataSource
+ supervisorId
);
Review Comment:
For consistency (similar to the logs below), should we add both
`supervisor[%s], dataSource[%s]` to the logs here as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]