gargvishesh commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1615453024
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -464,144 +472,49 @@ void emitCompactIngestionModeMetrics(
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
-
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
- final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
+ final List<NonnullPair<Interval, DataSchema>> intervalDataSchemas =
createDataSchemasForIntervals(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
- ioConfig,
segmentProvider,
- partitionConfigurationManager,
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
- toolbox.getCoordinatorClient(),
- segmentCacheManagerFactory,
getMetricBuilder()
);
- final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
- .range(0, ingestionSpecs.size())
- .mapToObj(i -> {
- // The ID of SubtaskSpecs is used as the base sequenceName in
segment allocation protocol.
- // The indexing tasks generated by the compaction task should use
different sequenceNames
- // so that they can allocate valid segment IDs with no duplication.
- ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
- final String baseSequenceName = createIndexTaskSpecId(i);
- return newTask(baseSequenceName, ingestionSpec);
- })
- .collect(Collectors.toList());
-
- if (indexTaskSpecs.isEmpty()) {
- String msg = StringUtils.format(
- "Can't find segments from inputSpec[%s], nothing to do.",
- ioConfig.getInputSpec()
- );
- log.warn(msg);
- return TaskStatus.failure(getId(), msg);
- } else {
- registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
- final int totalNumSpecs = indexTaskSpecs.size();
- log.info("Generated [%d] compaction task specs", totalNumSpecs);
-
- int failCnt = 0;
- final TaskReport.ReportMap completionReports = new
TaskReport.ReportMap();
- for (int i = 0; i < indexTaskSpecs.size(); i++) {
- ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
- final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
- if (!currentSubTaskHolder.setTask(eachSpec)) {
- String errMsg = "Task was asked to stop. Finish as failed.";
- log.info(errMsg);
- return TaskStatus.failure(getId(), errMsg);
- }
- try {
- if (eachSpec.isReady(toolbox.getTaskActionClient())) {
- log.info("Running indexSpec: " + json);
- final TaskStatus eachResult = eachSpec.run(toolbox);
- if (!eachResult.isSuccess()) {
- failCnt++;
- log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
-
- String reportKeySuffix = "_" + i;
- Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
- reports -> completionReports.putAll(
- CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)
- )
- );
- } else {
- failCnt++;
- log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
- }
- }
- catch (Exception e) {
- failCnt++;
- log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
- }
- }
-
- String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d]
failed",
- totalNumSpecs, totalNumSpecs - failCnt,
failCnt
- );
- toolbox.getTaskReportFileWriter().write(getId(), completionReports);
- log.info(msg);
- return failCnt == 0 ? TaskStatus.success(getId()) :
TaskStatus.failure(getId(), msg);
+ if (compactionStrategy == null) {
+ // Can only happen for MSQ engine, when the json subtype reqd for
deserialization isn't available due to
Review Comment:
Yeah you are right. Removed this check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]