samarthjain opened a new issue #10099:
URL: https://github.com/apache/druid/issues/10099


   ### Description
   It would be good to add an option in the Druid Hadoop Indexer job to wait 
for segments to be loaded on the target cluster before the job is terminated.
   
   ### Motivation
   Druid Hadoop indexer process involves invoking a map reduce job that 
involves reading data from an input source (Hive/Iceberg/Druid datasources 
etc.) and writing Druid segments to the deep storage. The final step of the 
process then writes the segment metadata for the newly created segments to an 
RDS. The coordinator running on the Druid cluster polls the RDS at a fixed 
interval for new segments that need to be loaded on the cluster.
   
   This process of segment load is asynchronous for the client that ran the 
Druid indexer which poses a hassle for users running the indexer in a workflow 
like setup. With no definitive way of knowing whether the segments were loaded 
on the cluster, before kicking off dependent downstream jobs, users either have 
to induce an artificial wait during which it is “hoped” that the segments will 
get loaded on the cluster or they have to come up with their own solutions 
using the Druid Coordinator REST endpoints.
   
   The proposal is to introduce a new configurable option in the Druid Hadoop 
Indexer that lets the indexer process complete only after the segments are 
loaded on the cluster. Since the Indexer already knows about the segments that 
were created, the client running the job can poll the coordinator to check 
their load status by issuing a `GET` request to
   ```
   /druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}
   ``` 
   
   Some of the details on the proposed implementation:
   
   Add a new config, CoordinatorWaitForSegmentLoadSpec, whose instance will be 
an optional field in HadoopIOConfig
   
   ```
   public class HadoopIOConfig implements IOConfig {
   ...
   @Nullable
   private final MetadataStorageUpdaterJobSpec metadataUpdateSpec;
   ...
   @Nullable
   private final CoordinatorWaitForSegmentLoadSpec waitForLoadSpec;
   ...
   }
   ```
   
   
   ```
   CoordinatorWaitForSegmentLoadSpec
   
   Fields:
   @NotNull
   // URI of the coordinator to hit for checking segment load statusString 
coordinatorURI
   
   @Nullable
   // How often to poll the coordinator 
   int coordinatorPollIntervalSeconds = 60
   
   @Nullable
   // Total duration to wait before bailing out. 
   // If the segment load doesn't complete within this duration, fail the job.
   int loadTimeoutSeconds = 3600 
   ```
   
   Sample spec:
   ```
   "ioConfig" : {
     "type" : "hadoop",
     "inputSpec" : {
       ...
       },
      "waitForSegmentLoadSpec" : {
          "coordinatorURI" : "http://localhost:8081";,
          "pollIntervalSeconds" : 30
          "loadTimeoutSeconds" : 1800
       }
     },
     ...
   }
   ```
   The actual job which would do the coordinator polling will be an instance of 
WaitForSegmentLoadJob. 
   
   ```
   public class WaitForSegmentLoadJob implements Jobby {
       @NotNull  
       private final CoordinatorWaitForSegmentLoadSpec waitSpec;
   
       public boolean loadComplete(List<DataSegment> segments) {
            ...
            // schedule the following using a scheduled executor service with 
an overall  timeout
            for (DataSegment segment : segments) {
                 // make a GET request to coordinator. If segment isn't loaded, 
it will return 204 response code.
                // else it will return a 200 response code. 
             }
           ...  
       }
      
       private static final String getSegmentStatusURI(DataSegment segment) {
           String coordinatorURI = waitSpec.getCoordinatorURI();
           SegmentId segmentId = segment.getId(); 
           String datasource = segment.getDatasource();
           return coordinatorURI + 
String.format("/druid/coordinator/v1/datasources/%s/segments/%s, datasource, 
segmentId); 
       }
   }
   
   ```
   
   The WaitForSegmentLoadJob will be instantiated in HadoopDruidIndexerJob if 
the 
   ```CoordinatorWaitForSegmentLoadSpec``` is not null.
   
   ```
   @Inject
   public HadoopDruidIndexerJob(
       HadoopDruidIndexerConfig config,
       MetadataStorageUpdaterJobHandler handler
   )
   {
     ...
     if (config.isWaitForSegmentLoadSpecSet()) {
         waitForSegmentLoadJob = new WaitForSegmentLoadJob(
             config.waitForSegmentLoadSpec()
         );
       } else {
         waitForSegmentLoadJob = null;
       }
   }
   
   @Override
   public boolean run()
   {
     ...
     jobs.add(
         () -> {
           publishedSegments = waitForSegmentLoadJob.loadComplete(segments);
           return true;
         }
     );
      ...
   }
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to