gianm commented on a change in pull request #6095: Add support 
'keepSegmentGranularity' for compactionTask
URL: https://github.com/apache/incubator-druid/pull/6095#discussion_r207743558
 
 

 ##########
 File path: 
indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
 ##########
 @@ -205,38 +216,46 @@ public boolean isReady(TaskActionClient 
taskActionClient) throws Exception
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
-    if (indexTaskSpec == null) {
-      final IndexIngestionSpec ingestionSpec = createIngestionSchema(
+    if (indexTaskSpecs == null) {
+      indexTaskSpecs = createIngestionSchema(
           toolbox,
           segmentProvider,
           dimensionsSpec,
+          keepSegmentGranularity,
           tuningConfig,
           jsonMapper
-      );
-
-      if (ingestionSpec != null) {
-        indexTaskSpec = new IndexTask(
-            getId(),
-            getGroupId(),
-            getTaskResource(),
-            getDataSource(),
-            ingestionSpec,
-            getContext(),
-            authorizerMapper,
-            chatHandlerProvider,
-            rowIngestionMetersFactory
-        );
-      }
+      ).stream()
+      .map(spec -> new IndexTask(
+          getId(),
+          getGroupId(),
+          getTaskResource(),
+          getDataSource(),
+          spec,
+          getContext(),
+          authorizerMapper,
+          chatHandlerProvider,
+          rowIngestionMetersFactory
+      ))
+      .collect(Collectors.toList());
     }
 
-    if (indexTaskSpec == null) {
+    if (indexTaskSpecs.isEmpty()) {
       log.warn("Interval[%s] has no segments, nothing to do.", interval);
       return TaskStatus.failure(getId());
     } else {
-      final String json = 
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
-      log.info("Generated compaction task details: " + json);
+      log.info("Generated [%d] compaction task specs: ", 
indexTaskSpecs.size());
+
+      for (IndexTask eachSpec : indexTaskSpecs) {
+        final String json = 
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
+        log.info("Running indexSpec: " + json);
 
-      return indexTaskSpec.run(toolbox);
+        final TaskStatus eachResult = eachSpec.run(toolbox);
+        if (!eachResult.isSuccess()) {
+          log.warn("Running indexSpec failed. Trying to the next indexSpec.");
 
 Review comment:
   What if it throws an exception?
   
   Also, "Trying to the next indexSpec" isn't grammatical, it should be "Trying 
the next indexSpec".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to