samarthjain opened a new issue #10099:
URL: https://github.com/apache/druid/issues/10099
### Description
It would be good to add an option in the Druid Hadoop Indexer job to wait
for segments to be loaded on the target cluster before the job is terminated.
### Motivation
Druid Hadoop indexer process involves invoking a map reduce job that
involves reading data from an input source (Hive/Iceberg/Druid datasources
etc.) and writing Druid segments to the deep storage. The final step of the
process then writes the segment metadata for the newly created segments to an
RDS. The coordinator running on the Druid cluster polls the RDS at a fixed
interval for new segments that need to be loaded on the cluster.
This process of segment load is asynchronous for the client that ran the
Druid indexer which poses a hassle for users running the indexer in a workflow
like setup. With no definitive way of knowing whether the segments were loaded
on the cluster, before kicking off dependent downstream jobs, users either have
to induce an artificial wait during which it is “hoped” that the segments will
get loaded on the cluster or they have to come up with their own solutions
using the Druid Coordinator REST endpoints.
The proposal is to introduce a new configurable option in the Druid Hadoop
Indexer that lets the indexer process complete only after the segments are
loaded on the cluster. Since the Indexer already knows about the segments that
were created, the client running the job can poll the coordinator to check
their load status by issuing a `GET` request to
```
/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}
```
Some of the details on the proposed implementation:
Add a new config, CoordinatorWaitForSegmentLoadSpec, whose instance will be
an optional field in HadoopIOConfig
```
public class HadoopIOConfig implements IOConfig {
...
@Nullable
private final MetadataStorageUpdaterJobSpec metadataUpdateSpec;
...
@Nullable
private final CoordinatorWaitForSegmentLoadSpec waitForLoadSpec;
...
}
```
```
CoordinatorWaitForSegmentLoadSpec
Fields:
@NotNull
// URI of the coordinator to hit for checking segment load statusString
coordinatorURI
@Nullable
// How often to poll the coordinator
int coordinatorPollIntervalSeconds = 60
@Nullable
// Total duration to wait before bailing out.
// If the segment load doesn't complete within this duration, fail the job.
int loadTimeoutSeconds = 3600
```
Sample spec:
```
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
...
},
"waitForSegmentLoadSpec" : {
"coordinatorURI" : "http://localhost:8081",
"pollIntervalSeconds" : 30
"loadTimeoutSeconds" : 1800
}
},
...
}
```
The actual job which would do the coordinator polling will be an instance of
WaitForSegmentLoadJob.
```
public class WaitForSegmentLoadJob implements Jobby {
@NotNull
private final CoordinatorWaitForSegmentLoadSpec waitSpec;
public boolean loadComplete(List<DataSegment> segments) {
...
// schedule the following using a scheduled executor service with
an overall timeout
for (DataSegment segment : segments) {
// make a GET request to coordinator. If segment isn't loaded,
it will return 204 response code.
// else it will return a 200 response code.
}
...
}
private static final String getSegmentStatusURI(DataSegment segment) {
String coordinatorURI = waitSpec.getCoordinatorURI();
SegmentId segmentId = segment.getId();
String datasource = segment.getDatasource();
return coordinatorURI +
String.format("/druid/coordinator/v1/datasources/%s/segments/%s, datasource,
segmentId);
}
}
```
The WaitForSegmentLoadJob will be instantiated in HadoopDruidIndexerJob if
the
```CoordinatorWaitForSegmentLoadSpec``` is not null.
```
@Inject
public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config,
MetadataStorageUpdaterJobHandler handler
)
{
...
if (config.isWaitForSegmentLoadSpecSet()) {
waitForSegmentLoadJob = new WaitForSegmentLoadJob(
config.waitForSegmentLoadSpec()
);
} else {
waitForSegmentLoadJob = null;
}
}
@Override
public boolean run()
{
...
jobs.add(
() -> {
publishedSegments = waitForSegmentLoadJob.loadComplete(segments);
return true;
}
);
...
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]