gargvishesh commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1615453283


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-
     // emit metric for compact ingestion mode:
     emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
 
-    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
+    final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas = 
createDataSchemasForIntervals(
         UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
-        ioConfig,
         segmentProvider,
-        partitionConfigurationManager,
         dimensionsSpec,
         transformSpec,
         metricsSpec,
         granularitySpec,
-        toolbox.getCoordinatorClient(),
-        segmentCacheManagerFactory,
         getMetricBuilder()
     );
-    final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
-        .range(0, ingestionSpecs.size())
-        .mapToObj(i -> {
-          // The ID of SubtaskSpecs is used as the base sequenceName in 
segment allocation protocol.
-          // The indexing tasks generated by the compaction task should use 
different sequenceNames
-          // so that they can allocate valid segment IDs with no duplication.
-          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
-          final String baseSequenceName = createIndexTaskSpecId(i);
-          return newTask(baseSequenceName, ingestionSpec);
-        })
-        .collect(Collectors.toList());
-
-    if (indexTaskSpecs.isEmpty()) {
-      String msg = StringUtils.format(
-          "Can't find segments from inputSpec[%s], nothing to do.",
-          ioConfig.getInputSpec()
-      );
-      log.warn(msg);
-      return TaskStatus.failure(getId(), msg);
-    } else {
-      registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
-      final int totalNumSpecs = indexTaskSpecs.size();
-      log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
-      int failCnt = 0;
-      final TaskReport.ReportMap completionReports = new 
TaskReport.ReportMap();
-      for (int i = 0; i < indexTaskSpecs.size(); i++) {
-        ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
-        final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
-        if (!currentSubTaskHolder.setTask(eachSpec)) {
-          String errMsg = "Task was asked to stop. Finish as failed.";
-          log.info(errMsg);
-          return TaskStatus.failure(getId(), errMsg);
-        }
-        try {
-          if (eachSpec.isReady(toolbox.getTaskActionClient())) {
-            log.info("Running indexSpec: " + json);
-            final TaskStatus eachResult = eachSpec.run(toolbox);
-            if (!eachResult.isSuccess()) {
-              failCnt++;
-              log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-            }
-
-            String reportKeySuffix = "_" + i;
-            Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
-                reports -> completionReports.putAll(
-                    CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)
-                )
-            );
-          } else {
-            failCnt++;
-            log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
-          }
-        }
-        catch (Exception e) {
-          failCnt++;
-          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
-        }
-      }
-
-      String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] 
failed",
-                                      totalNumSpecs, totalNumSpecs - failCnt, 
failCnt
-      );
 
-      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
-      log.info(msg);
-      return failCnt == 0 ? TaskStatus.success(getId()) : 
TaskStatus.failure(getId(), msg);
+    if (compactionStrategy == null) {
+      // Can only happen for MSQ engine, when the json subtype reqd for 
deserialization isn't available due to
+      // missing extn.
+      throw DruidException.forPersona(DruidException.Persona.ADMIN)

Review Comment:
   The check itself is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to