This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 22b41dd Task reports for parallel task: single phase and sequential
mode (#11688)
22b41dd is described below
commit 22b41ddbbfe2b07b085e295ba171bcdc07e04900
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Sep 16 13:58:11 2021 -0500
Task reports for parallel task: single phase and sequential mode (#11688)
* Task reports for parallel task: single phase and sequential mode
* Address comments
* Add null check for currentSubTaskHolder
---
.../druid/indexing/common/task/IndexTask.java | 17 +-
.../parallel/ParallelIndexSupervisorTask.java | 222 ++++++++++++++-
.../batch/parallel/PartialSegmentMergeTask.java | 6 +-
.../task/batch/parallel/PushedSegmentsReport.java | 22 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 315 ++++++++++++++++++---
.../AbstractParallelIndexSupervisorTaskTest.java | 19 +-
.../ParallelIndexSupervisorTaskResourceTest.java | 3 +-
.../batch/parallel/PushedSegmentsReportTest.java | 32 +++
.../parallel/SinglePhaseParallelIndexingTest.java | 169 ++++++++++-
.../incremental/RowIngestionMetersTotals.java | 35 +++
.../incremental/RowIngestionMetersTotalsTest.java | 32 +++
.../client/indexing/HttpIndexingServiceClient.java | 26 ++
.../client/indexing/IndexingServiceClient.java | 3 +
.../indexing/HttpIndexingServiceClientTest.java | 67 +++++
.../client/indexing/NoopIndexingServiceClient.java | 7 +
15 files changed, 903 insertions(+), 72 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a798522..f22c2a0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -286,7 +286,12 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- Map<String, List<String>> events = new HashMap<>();
+ return Response.ok(doGetUnparseableEvents(full)).build();
+ }
+
+ public Map<String, Object> doGetUnparseableEvents(String full)
+ {
+ Map<String, Object> events = new HashMap<>();
boolean needsDeterminePartitions = false;
boolean needsBuildSegments = false;
@@ -325,11 +330,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
)
);
}
-
- return Response.ok(events).build();
+ return events;
}
- private Map<String, Object> doGetRowStats(String full)
+ public Map<String, Object> doGetRowStats(String full)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
@@ -784,6 +788,11 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return hllCollectors;
}
+ public IngestionState getIngestionState()
+ {
+ return ingestionState;
+ }
+
/**
* This method reads input data row by row and adds the read row to a proper
segment using {@link BaseAppenderatorDriver}.
* If there is no segment for the row, a new one is created. Segments can
be published in the middle of reading inputs
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index b66f49f..19b965c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -66,6 +67,8 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -92,6 +95,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -169,6 +173,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@MonotonicNonNull
private volatile TaskToolbox toolbox;
+ private IngestionState ingestionState;
+
@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
@@ -218,6 +224,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
awaitSegmentAvailabilityTimeoutMillis =
ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
+ this.ingestionState = IngestionState.NOT_STARTED;
}
private static void
checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
@@ -484,6 +491,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
}
finally {
+ ingestionState = IngestionState.COMPLETED;
toolbox.getChatHandlerProvider().unregister(getId());
}
}
@@ -553,18 +561,19 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
*/
private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws
Exception
{
- final ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport>
runner = createRunner(
+ ingestionState = IngestionState.BUILD_SEGMENTS;
+ ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport>
parallelSinglePhaseRunner = createRunner(
toolbox,
this::createSinglePhaseTaskRunner
);
- final TaskState state = runNextPhase(runner);
+ final TaskState state = runNextPhase(parallelSinglePhaseRunner);
TaskStatus taskStatus;
if (state.isSuccess()) {
//noinspection ConstantConditions
- publishSegments(toolbox, runner.getReports());
+ publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
- waitForSegmentAvailability(runner.getReports());
+ waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
taskStatus = TaskStatus.success(getId());
} else {
@@ -572,7 +581,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
Preconditions.checkState(state.isFailure(), "Unrecognized state after
task is complete[%s]", state);
final String errorMessage = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
- runner.getName()
+ parallelSinglePhaseRunner.getName()
);
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
@@ -1068,7 +1077,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
{
- final IndexTask indexTask = new IndexTask(
+ IndexTask sequentialIndexTask = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
@@ -1082,8 +1091,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
getContext()
);
- if (currentSubTaskHolder.setTask(indexTask) &&
indexTask.isReady(toolbox.getTaskActionClient())) {
- return indexTask.run(toolbox);
+ if (currentSubTaskHolder.setTask(sequentialIndexTask)
+ && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
+ return sequentialIndexTask.run(toolbox);
} else {
String msg = "Task was asked to stop. Finish as failed";
LOG.info(msg);
@@ -1101,13 +1111,17 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
*/
private Map<String, TaskReport> getTaskCompletionReports(TaskStatus
taskStatus, boolean segmentAvailabilityConfirmed)
{
+ Pair<Map<String, Object>, Map<String, Object>>
rowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents(
+ "true",
+ true
+ );
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
- new HashMap<>(),
- new HashMap<>(),
+ rowStatsAndUnparseableEvents.rhs,
+ rowStatsAndUnparseableEvents.lhs,
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed
)
@@ -1415,4 +1429,192 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
}
}
+
+ private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object
buildSegmentsRowStats)
+ {
+ if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+ // This case is for unit tests. Normally when deserialized the row stats
will apppear as a Map<String, Object>.
+ return (RowIngestionMetersTotals) buildSegmentsRowStats;
+ } else if (buildSegmentsRowStats instanceof Map) {
+ Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>)
buildSegmentsRowStats;
+ return new RowIngestionMetersTotals(
+ ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+ ((Number)
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+ ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+ ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+ );
+ } else {
+ // should never happen
+ throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " +
buildSegmentsRowStats.getClass());
+ }
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelSinglePhase(
+ SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
+ boolean includeUnparseable
+ )
+ {
+ long processed = 0L;
+ long processedWithError = 0L;
+ long thrownAway = 0L;
+ long unparseable = 0L;
+
+ List<String> unparseableEvents = new ArrayList<>();
+
+ // Get stats from completed tasks
+ Map<String, PushedSegmentsReport> completedSubtaskReports =
parallelSinglePhaseRunner.getReports();
+ for (PushedSegmentsReport pushedSegmentsReport :
completedSubtaskReports.values()) {
+ Map<String, TaskReport> taskReport =
pushedSegmentsReport.getTaskReport();
+ if (taskReport == null || taskReport.isEmpty()) {
+ LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
+ continue;
+ }
+ IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
+ IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+ IngestionStatsAndErrorsTaskReportData reportData =
+ (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
+ RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+ reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+ );
+
+ if (includeUnparseable) {
+ List<String> taskUnparsebleEvents = (List<String>)
reportData.getUnparseableEvents()
+
.get(RowIngestionMeters.BUILD_SEGMENTS);
+ unparseableEvents.addAll(taskUnparsebleEvents);
+ }
+
+ processed += totals.getProcessed();
+ processedWithError += totals.getProcessedWithError();
+ thrownAway += totals.getThrownAway();
+ unparseable += totals.getUnparseable();
+ }
+
+ // Get stats from running tasks
+ Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+ for (String runningTaskId : runningTaskIds) {
+ try {
+ Map<String, Object> report =
toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
+ if (report == null || report.isEmpty()) {
+ // task does not have a running report yet
+ continue;
+ }
+ Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>)
report.get("ingestionStatsAndErrors");
+ Map<String, Object> payload = (Map<String, Object>)
ingestionStatsAndErrors.get("payload");
+ Map<String, Object> rowStats = (Map<String, Object>)
payload.get("rowStats");
+ Map<String, Object> totals = (Map<String, Object>)
rowStats.get("totals");
+ Map<String, Object> buildSegments = (Map<String, Object>)
totals.get(RowIngestionMeters.BUILD_SEGMENTS);
+
+ if (includeUnparseable) {
+ Map<String, Object> taskUnparseableEvents = (Map<String, Object>)
payload.get("unparseableEvents");
+ List<String> buildSegmentsUnparseableEvents = (List<String>)
taskUnparseableEvents.get(
+ RowIngestionMeters.BUILD_SEGMENTS
+ );
+ unparseableEvents.addAll(buildSegmentsUnparseableEvents);
+ }
+
+ processed += ((Number) buildSegments.get("processed")).longValue();
+ processedWithError += ((Number)
buildSegments.get("processedWithError")).longValue();
+ thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
+ unparseable += ((Number) buildSegments.get("unparseable")).longValue();
+ }
+ catch (Exception e) {
+ LOG.warn(e, "Encountered exception when getting live subtask report
for task: " + runningTaskId);
+ }
+ }
+
+ Map<String, Object> rowStatsMap = new HashMap<>();
+ Map<String, Object> totalsMap = new HashMap<>();
+ totalsMap.put(
+ RowIngestionMeters.BUILD_SEGMENTS,
+ new RowIngestionMetersTotals(processed, processedWithError,
thrownAway, unparseable)
+ );
+ rowStatsMap.put("totals", totalsMap);
+
+ return Pair.of(rowStatsMap,
ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
+ {
+ if (currentSubTaskHolder == null) {
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ }
+
+ Object currentRunner = currentSubTaskHolder.getTask();
+ if (currentRunner == null) {
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ }
+
+ if (isParallelMode()) {
+ if (isGuaranteedRollup(ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig())) {
+ // multiphase is not supported yet
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
+ (SinglePhaseParallelIndexTaskRunner) currentRunner,
+ includeUnparseable
+ );
+ }
+ } else {
+ IndexTask currentSequentialTask = (IndexTask) currentRunner;
+ return Pair.of(currentSequentialTask.doGetRowStats(full),
currentSequentialTask.doGetUnparseableEvents(full));
+ }
+ }
+
+ @GET
+ @Path("/rowStats")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRowStats(
+ @Context final HttpServletRequest req,
+ @QueryParam("full") String full
+ )
+ {
+ IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ return Response.ok(doGetRowStatsAndUnparseableEvents(full,
false).lhs).build();
+ }
+
+ @VisibleForTesting
+ public Map<String, Object> doGetLiveReports(String full)
+ {
+ Map<String, Object> returnMap = new HashMap<>();
+ Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+ Map<String, Object> payload = new HashMap<>();
+
+ Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents
=
+ doGetRowStatsAndUnparseableEvents(full, true);
+
+ // use the sequential task's ingestion state if we were running that mode
+ IngestionState ingestionStateForReport;
+ if (isParallelMode()) {
+ ingestionStateForReport = ingestionState;
+ } else {
+ IndexTask currentSequentialTask = (IndexTask)
currentSubTaskHolder.getTask();
+ ingestionStateForReport = currentSequentialTask == null
+ ? ingestionState
+ : currentSequentialTask.getIngestionState();
+ }
+
+ payload.put("ingestionState", ingestionStateForReport);
+ payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
+ payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);
+
+ ingestionStatsAndErrors.put("taskId", getId());
+ ingestionStatsAndErrors.put("payload", payload);
+ ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+ returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+ return returnMap;
+ }
+
+ @GET
+ @Path("/liveReports")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getLiveReports(
+ @Context final HttpServletRequest req,
+ @QueryParam("full") String full
+ )
+ {
+ IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+
+ return Response.ok(doGetLiveReports(full)).build();
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 38a7261..60814fd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -197,7 +198,10 @@ abstract class PartialSegmentMergeTask<S extends
ShardSpec> extends PerfectRollu
intervalToUnzippedFiles
);
- taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(),
Collections.emptySet(), pushedSegments));
+ taskClient.report(
+ supervisorTaskId,
+ new PushedSegmentsReport(getId(), Collections.emptySet(),
pushedSegments, ImmutableMap.of())
+ );
return TaskStatus.success(getId());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
index 8654172..8ed373c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
@@ -22,8 +22,10 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.timeline.DataSegment;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -39,17 +41,20 @@ public class PushedSegmentsReport implements SubTaskReport
private final String taskId;
private final Set<DataSegment> oldSegments;
private final Set<DataSegment> newSegments;
+ private final Map<String, TaskReport> taskReport;
@JsonCreator
public PushedSegmentsReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("oldSegments") Set<DataSegment> oldSegments,
- @JsonProperty("segments") Set<DataSegment> newSegments
+ @JsonProperty("segments") Set<DataSegment> newSegments,
+ @JsonProperty("taskReport") Map<String, TaskReport> taskReport
)
{
this.taskId = Preconditions.checkNotNull(taskId, "taskId");
this.oldSegments = Preconditions.checkNotNull(oldSegments, "oldSegments");
this.newSegments = Preconditions.checkNotNull(newSegments, "newSegments");
+ this.taskReport = taskReport;
}
@Override
@@ -71,6 +76,12 @@ public class PushedSegmentsReport implements SubTaskReport
return newSegments;
}
+ @JsonProperty("taskReport")
+ public Map<String, TaskReport> getTaskReport()
+ {
+ return taskReport;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -81,14 +92,15 @@ public class PushedSegmentsReport implements SubTaskReport
return false;
}
PushedSegmentsReport that = (PushedSegmentsReport) o;
- return Objects.equals(taskId, that.taskId) &&
- Objects.equals(oldSegments, that.oldSegments) &&
- Objects.equals(newSegments, that.newSegments);
+ return Objects.equals(taskId, that.taskId)
+ && Objects.equals(oldSegments, that.oldSegments)
+ && Objects.equals(newSegments, that.newSegments)
+ && Objects.equals(taskReport, that.taskReport);
}
@Override
public int hashCode()
{
- return Objects.hash(taskId, oldSegments, newSegments);
+ return Objects.hash(taskId, oldSegments, newSegments, taskReport);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index af885e7..08cba47 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -21,13 +21,20 @@ package
org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -35,6 +42,7 @@ import
org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -58,16 +66,29 @@ import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.firehose.ChatHandler;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -80,7 +101,7 @@ import java.util.concurrent.TimeoutException;
* generates and pushes segments, and reports them to the {@link
SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
-public class SinglePhaseSubTask extends AbstractBatchSubtask
+public class SinglePhaseSubTask extends AbstractBatchSubtask implements
ChatHandler
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
@@ -106,6 +127,20 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask
*/
private final boolean missingIntervalsInOverwriteMode;
+ @MonotonicNonNull
+ private AuthorizerMapper authorizerMapper;
+
+ @MonotonicNonNull
+ private RowIngestionMeters rowIngestionMeters;
+
+ @MonotonicNonNull
+ private ParseExceptionHandler parseExceptionHandler;
+
+ @Nullable
+ private String errorMsg;
+
+ private IngestionState ingestionState;
+
@JsonCreator
public SinglePhaseSubTask(
// id shouldn't be null except when this task is created by
ParallelIndexSupervisorTask
@@ -144,6 +179,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
+ this.ingestionState = IngestionState.NOT_STARTED;
}
@Override
@@ -187,43 +223,74 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask
}
@Override
- public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
+ public TaskStatus runTask(final TaskToolbox toolbox)
{
- if (missingIntervalsInOverwriteMode) {
- LOG.warn(
- "Intervals are missing in granularitySpec while this task is
potentially overwriting existing segments. "
- + "Forced to use timeChunk lock."
+ try {
+ if (missingIntervalsInOverwriteMode) {
+ LOG.warn(
+ "Intervals are missing in granularitySpec while this task is
potentially overwriting existing segments. "
+ + "Forced to use timeChunk lock."
+ );
+ }
+ this.authorizerMapper = toolbox.getAuthorizerMapper();
+
+ toolbox.getChatHandlerProvider().register(getId(), this, false);
+
+ rowIngestionMeters =
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+ parseExceptionHandler = new ParseExceptionHandler(
+ rowIngestionMeters,
+ ingestionSchema.getTuningConfig().isLogParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+ ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
);
- }
- final InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(
- ingestionSchema.getDataSchema().getParser()
- );
- final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientFactory().build(
- new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()),
- getId(),
- 1, // always use a single http thread
- ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
- ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
- );
- final Set<DataSegment> pushedSegments = generateAndPushSegments(
- toolbox,
- taskClient,
- inputSource,
- toolbox.getIndexingTmpDir()
- );
+ final InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(
+ ingestionSchema.getDataSchema().getParser()
+ );
- // Find inputSegments overshadowed by pushedSegments
- final Set<DataSegment> allSegments = new
HashSet<>(getTaskLockHelper().getLockedExistingSegments());
- allSegments.addAll(pushedSegments);
- final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(allSegments);
- final Set<DataSegment> oldSegments =
FluentIterable.from(timeline.findFullyOvershadowed())
-
.transformAndConcat(TimelineObjectHolder::getObject)
-
.transform(PartitionChunk::getObject)
- .toSet();
- taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(),
oldSegments, pushedSegments));
-
- return TaskStatus.success(getId());
+ final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientFactory().build(
+ new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()),
+ getId(),
+ 1, // always use a single http thread
+ ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
+ ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
+ );
+ ingestionState = IngestionState.BUILD_SEGMENTS;
+ final Set<DataSegment> pushedSegments = generateAndPushSegments(
+ toolbox,
+ taskClient,
+ inputSource,
+ toolbox.getIndexingTmpDir()
+ );
+
+ // Find inputSegments overshadowed by pushedSegments
+ final Set<DataSegment> allSegments = new
HashSet<>(getTaskLockHelper().getLockedExistingSegments());
+ allSegments.addAll(pushedSegments);
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(allSegments);
+ final Set<DataSegment> oldSegments =
FluentIterable.from(timeline.findFullyOvershadowed())
+
.transformAndConcat(TimelineObjectHolder::getObject)
+
.transform(PartitionChunk::getObject)
+ .toSet();
+
+ Map<String, TaskReport> taskReport = getTaskCompletionReports();
+ taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(),
oldSegments, pushedSegments, taskReport));
+
+ toolbox.getTaskReportFileWriter().write(getId(), taskReport);
+
+ return TaskStatus.success(getId());
+ }
+ catch (Exception e) {
+ LOG.error(e, "Encountered exception in parallel sub task.");
+ errorMsg = Throwables.getStackTraceAsString(e);
+ toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
+ return TaskStatus.failure(
+ getId(),
+ errorMsg
+ );
+ }
+ finally {
+ toolbox.getChatHandlerProvider().unregister(getId());
+ }
}
@Override
@@ -324,13 +391,6 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask
useLineageBasedSegmentAllocation
);
- final RowIngestionMeters rowIngestionMeters =
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
- final ParseExceptionHandler parseExceptionHandler = new
ParseExceptionHandler(
- rowIngestionMeters,
- tuningConfig.isLogParseExceptions(),
- tuningConfig.getMaxParseExceptions(),
- tuningConfig.getMaxSavedParseExceptions()
- );
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
@@ -339,12 +399,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask
dataSchema,
tuningConfig,
rowIngestionMeters,
- new ParseExceptionHandler(
- rowIngestionMeters,
- tuningConfig.isLogParseExceptions(),
- tuningConfig.getMaxParseExceptions(),
- tuningConfig.getMaxSavedParseExceptions()
- )
+ parseExceptionHandler
);
boolean exceptionOccurred = false;
try (
@@ -424,4 +479,170 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask
}
}
}
+
+ @GET
+ @Path("/unparseableEvents")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getUnparseableEvents(
+ @Context final HttpServletRequest req,
+ @QueryParam("full") String full
+ )
+ {
+ IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ Map<String, List<String>> events = new HashMap<>();
+
+ boolean needsBuildSegments = false;
+
+ if (full != null) {
+ needsBuildSegments = true;
+ } else {
+ switch (ingestionState) {
+ case BUILD_SEGMENTS:
+ case COMPLETED:
+ needsBuildSegments = true;
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (needsBuildSegments) {
+ events.put(
+ RowIngestionMeters.BUILD_SEGMENTS,
+ IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ parseExceptionHandler.getSavedParseExceptions()
+ )
+ );
+ }
+
+ return Response.ok(events).build();
+ }
+
+ private Map<String, Object> doGetRowStats(String full)
+ {
+ Map<String, Object> returnMap = new HashMap<>();
+ Map<String, Object> totalsMap = new HashMap<>();
+ Map<String, Object> averagesMap = new HashMap<>();
+
+ boolean needsBuildSegments = false;
+
+ if (full != null) {
+ needsBuildSegments = true;
+ } else {
+ switch (ingestionState) {
+ case BUILD_SEGMENTS:
+ case COMPLETED:
+ needsBuildSegments = true;
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (needsBuildSegments) {
+ totalsMap.put(
+ RowIngestionMeters.BUILD_SEGMENTS,
+ rowIngestionMeters.getTotals()
+ );
+ averagesMap.put(
+ RowIngestionMeters.BUILD_SEGMENTS,
+ rowIngestionMeters.getMovingAverages()
+ );
+ }
+
+ returnMap.put("totals", totalsMap);
+ returnMap.put("movingAverages", averagesMap);
+ return returnMap;
+ }
+
+ @GET
+ @Path("/rowStats")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRowStats(
+ @Context final HttpServletRequest req,
+ @QueryParam("full") String full
+ )
+ {
+ IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ return Response.ok(doGetRowStats(full)).build();
+ }
+
+ @VisibleForTesting
+ public Map<String, Object> doGetLiveReports(String full)
+ {
+ Map<String, Object> returnMap = new HashMap<>();
+ Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+ Map<String, Object> payload = new HashMap<>();
+ Map<String, Object> events = getTaskCompletionUnparseableEvents();
+
+ payload.put("ingestionState", ingestionState);
+ payload.put("unparseableEvents", events);
+ payload.put("rowStats", doGetRowStats(full));
+
+ ingestionStatsAndErrors.put("taskId", getId());
+ ingestionStatsAndErrors.put("payload", payload);
+ ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+ returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+ return returnMap;
+ }
+
+ @GET
+ @Path("/liveReports")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getLiveReports(
+ @Context final HttpServletRequest req,
+ @QueryParam("full") String full
+ )
+ {
+ IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
+ return Response.ok(doGetLiveReports(full)).build();
+ }
+
+ private Map<String, Object> getTaskCompletionRowStats()
+ {
+ Map<String, Object> metrics = new HashMap<>();
+ metrics.put(
+ RowIngestionMeters.BUILD_SEGMENTS,
+ rowIngestionMeters.getTotals()
+ );
+ return metrics;
+ }
+
+ /**
+ * Generate an IngestionStatsAndErrorsTaskReport for the task.
+ **
+ * @return
+ */
+ private Map<String, TaskReport> getTaskCompletionReports()
+ {
+ return TaskReport.buildTaskReports(
+ new IngestionStatsAndErrorsTaskReport(
+ getId(),
+ new IngestionStatsAndErrorsTaskReportData(
+ IngestionState.COMPLETED,
+ getTaskCompletionUnparseableEvents(),
+ getTaskCompletionRowStats(),
+ errorMsg,
+ false // not applicable for parallel subtask
+ )
+ )
+ );
+ }
+
+ private Map<String, Object> getTaskCompletionUnparseableEvents()
+ {
+ Map<String, Object> unparseableEventsMap = new HashMap<>();
+ List<String> parseExceptionMessages =
IndexTaskUtils.getMessagesFromSavedParseExceptions(
+ parseExceptionHandler.getSavedParseExceptions()
+ );
+
+ if (parseExceptionMessages != null) {
+ unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS,
parseExceptionMessages);
+ } else {
+ unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS,
ImmutableList.of());
+ }
+
+ return unparseableEventsMap;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index fc1328b..9f5e4d6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -177,7 +177,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
null,
null,
null,
- null,
+ 5,
null,
null
);
@@ -312,7 +312,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
return coordinatorClient;
}
- private static class TaskContainer
+ protected static class TaskContainer
{
private final Task task;
@MonotonicNonNull
@@ -325,6 +325,11 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
this.task = task;
}
+ public Task getTask()
+ {
+ return task;
+ }
+
private void setStatusFuture(Future<TaskStatus> statusFuture)
{
this.statusFuture = statusFuture;
@@ -421,6 +426,11 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
}
}
+ private TaskContainer getTaskContainer(String taskId)
+ {
+ return tasks.get(taskId);
+ }
+
private Future<TaskStatus> runTask(Task task)
{
final TaskContainer taskContainer = new TaskContainer(task);
@@ -531,6 +541,11 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
return taskRunner.run(injectIfNeeded(task));
}
+ public TaskContainer getTaskContainer(String taskId)
+ {
+ return taskRunner.getTaskContainer(taskId);
+ }
+
public TaskStatus runAndWait(Task task)
{
return taskRunner.runAndWait(injectIfNeeded(task));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index ecb910b..184ff84 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSplit;
@@ -708,7 +709,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
taskClient.report(
getSupervisorTaskId(),
- new PushedSegmentsReport(getId(), Collections.emptySet(),
Collections.singleton(segment))
+ new PushedSegmentsReport(getId(), Collections.emptySet(),
Collections.singleton(segment), ImmutableMap.of())
);
return TaskStatus.fromCode(getId(), state);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
new file mode 100644
index 0000000..05d0a1f
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class PushedSegmentsReportTest
+{
+ @Test
+ public void testEquals()
+ {
+
EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify();
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index f04559f..fdcab96 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -22,8 +22,10 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -37,6 +39,7 @@ import
org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
@@ -61,6 +64,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -105,6 +109,14 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
Files.newBufferedWriter(new File(inputDir, "test_" +
i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 +
i, i));
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 +
i, i));
+ if (i == 0) {
+ // thrown away due to timestamp outside interval
+ writer.write(StringUtils.format("2012-12-%d,%d th test file\n", 25 +
i, i));
+ // unparseable metric value
+ writer.write(StringUtils.format("2017-12-%d,%d th test
file,badval\n", 25 + i, i));
+ // unparseable row
+ writer.write(StringUtils.format("2017unparseable\n"));
+ }
}
}
@@ -114,6 +126,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 +
i, i));
}
}
+
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
}
@@ -153,7 +166,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
}
}
- private void runTestTask(
+ private ParallelIndexSupervisorTask runTestTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
@@ -170,9 +183,11 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
appendToExisting,
originalSegmentsIfAppend
);
+ TaskContainer taskContainer =
getIndexingServiceClient().getTaskContainer(task.getId());
+ return (ParallelIndexSupervisorTask) taskContainer.getTask();
}
- private void runOverwriteTask(
+ private ParallelIndexSupervisorTask runOverwriteTask(
@Nullable Interval interval,
Granularity segmentGranularity,
LockGranularity actualLockGranularity
@@ -182,6 +197,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(task, actualLockGranularity);
+ TaskContainer taskContainer =
getIndexingServiceClient().getTaskContainer(task.getId());
+ return (ParallelIndexSupervisorTask) taskContainer.getTask();
}
private void testRunAndOverwrite(@Nullable Interval inputInterval,
Granularity secondSegmentGranularity)
@@ -296,6 +313,58 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
+ @Test()
+ public void testRunInParallelTaskReports()
+ {
+ ParallelIndexSupervisorTask task = runTestTask(
+ Intervals.of("2017-12/P1M"),
+ Granularities.DAY,
+ false,
+ Collections.emptyList()
+ );
+ Map<String, Object> actualReports = task.doGetLiveReports("full");
+ Map<String, Object> expectedReports = getExpectedTaskReportParallel(
+ task.getId(),
+ ImmutableList.of(
+ "Timestamp[2017unparseable] is unparseable! Event:
{ts=2017unparseable}",
+ "Found unparseable columns in row:
[MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z,"
+ + " event={ts=2017-12-25, dim=0 th test file, val=badval},
dimensions=[ts, dim]}], "
+ + "exceptions: [Unable to parse value[badval] for field[val]]"
+ ),
+ new RowIngestionMetersTotals(
+ 10,
+ 1,
+ 1,
+ 1)
+ );
+ Assert.assertEquals(expectedReports, actualReports);
+ }
+
+ private Map<String, Object> getExpectedTaskReportParallel(
+ String taskId,
+ List<String> expectedUnparseableEvents,
+ RowIngestionMetersTotals expectedTotals
+ )
+ {
+ Map<String, Object> returnMap = new HashMap<>();
+ Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+ Map<String, Object> payload = new HashMap<>();
+
+ payload.put("ingestionState", IngestionState.COMPLETED);
+ payload.put("unparseableEvents", ImmutableMap.of("buildSegments",
expectedUnparseableEvents));
+ payload.put("rowStats", ImmutableMap.of("totals",
ImmutableMap.of("buildSegments", expectedTotals)));
+
+ ingestionStatsAndErrors.put("taskId", taskId);
+ ingestionStatsAndErrors.put("payload", payload);
+ ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+ returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+ return returnMap;
+ }
+
+ //
+ // Ingest all data.
+
@Test
public void testWithoutIntervalWithDifferentSegmentGranularity()
{
@@ -318,6 +387,102 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(task, lockGranularity, appendToExisting,
Collections.emptyList());
+
+ TaskContainer taskContainer =
getIndexingServiceClient().getTaskContainer(task.getId());
+ final ParallelIndexSupervisorTask executedTask =
(ParallelIndexSupervisorTask) taskContainer.getTask();
+ Map<String, Object> actualReports = executedTask.doGetLiveReports("full");
+
+ RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(
+ 10,
+ 1,
+ 1,
+ 1
+ );
+ List<String> expectedUnparseableEvents = ImmutableList.of(
+ "Timestamp[2017unparseable] is unparseable! Event:
{ts=2017unparseable}",
+ "Found unparseable columns in row:
[MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z,"
+ + " event={ts=2017-12-25, dim=0 th test file, val=badval},
dimensions=[ts, dim]}], "
+ + "exceptions: [Unable to parse value[badval] for field[val]]"
+ );
+
+ Map<String, Object> expectedReports;
+ if (useInputFormatApi) {
+ expectedReports = getExpectedTaskReportSequential(
+ task.getId(),
+ expectedUnparseableEvents,
+ expectedTotals
+ );
+ } else {
+ // when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses
the single phase runner
+ // instead of sequential runner
+ expectedReports = getExpectedTaskReportParallel(
+ task.getId(),
+ expectedUnparseableEvents,
+ expectedTotals
+ );
+ }
+
+ Assert.assertEquals(expectedReports, actualReports);
+ System.out.println(actualReports);
+ }
+
+ private Map<String, Object> getExpectedTaskReportSequential(
+ String taskId,
+ List<String> expectedUnparseableEvents,
+ RowIngestionMetersTotals expectedTotals
+ )
+ {
+ Map<String, Object> returnMap = new HashMap<>();
+ Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+ Map<String, Object> payload = new HashMap<>();
+
+ payload.put("ingestionState", IngestionState.COMPLETED);
+ payload.put(
+ "unparseableEvents",
+ ImmutableMap.of(
+ "determinePartitions", ImmutableList.of(),
+ "buildSegments", expectedUnparseableEvents
+ )
+ );
+ Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
+ "processed", 0.0,
+ "unparseable", 0.0,
+ "thrownAway", 0.0,
+ "processedWithError", 0.0
+ );
+
+ Map<String, Object> emptyAverages = ImmutableMap.of(
+ "1m", emptyAverageMinuteMap,
+ "5m", emptyAverageMinuteMap,
+ "15m", emptyAverageMinuteMap
+ );
+
+ payload.put(
+ "rowStats",
+ ImmutableMap.of(
+ "movingAverages",
+ ImmutableMap.of(
+ "determinePartitions",
+ emptyAverages,
+ "buildSegments",
+ emptyAverages
+ ),
+ "totals",
+ ImmutableMap.of(
+ "determinePartitions",
+ new RowIngestionMetersTotals(0, 0, 0, 0),
+ "buildSegments",
+ expectedTotals
+ )
+ )
+ );
+
+ ingestionStatsAndErrors.put("taskId", taskId);
+ ingestionStatsAndErrors.put("payload", payload);
+ ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+ returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+ return returnMap;
}
@Test
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index d9c6ce5..89bac75 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Objects;
+
public class RowIngestionMetersTotals
{
private final long processed;
@@ -66,4 +68,37 @@ public class RowIngestionMetersTotals
{
return unparseable;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowIngestionMetersTotals that = (RowIngestionMetersTotals) o;
+ return processed == that.processed
+ && processedWithError == that.processedWithError
+ && thrownAway == that.thrownAway
+ && unparseable == that.unparseable;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(processed, processedWithError, thrownAway,
unparseable);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RowIngestionMetersTotals{" +
+ "processed=" + processed +
+ ", processedWithError=" + processedWithError +
+ ", thrownAway=" + thrownAway +
+ ", unparseable=" + unparseable +
+ '}';
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
new file mode 100644
index 0000000..197b984
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class RowIngestionMetersTotalsTest
+{
+ @Test
+ public void testEquals()
+ {
+
EqualsVerifier.forClass(RowIngestionMetersTotals.class).usingGetClass().verify();
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 60bbae6..afab3b5 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -336,6 +336,32 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
}
}
+ @Nullable
+ @Override
+ public Map<String, Object> getTaskReport(String taskId)
+ {
+ try {
+ final StringFullResponseHolder responseHolder = druidLeaderClient.go(
+ druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ StringUtils.format("/druid/indexer/v1/task/%s/reports",
StringUtils.urlEncode(taskId))
+ )
+ );
+
+ if (responseHolder.getContent().length() == 0) {
+ return null;
+ }
+
+ return jsonMapper.readValue(
+ responseHolder.getContent(),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+ }
+ catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer>
minTaskPriority)
{
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index c379077..84f9f55 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -66,6 +66,9 @@ public interface IndexingServiceClient
@Nullable
TaskPayloadResponse getTaskPayload(String taskId);
+ @Nullable
+ Map<String, Object> getTaskReport(String taskId);
+
/**
* Gets a List of Intervals locked by higher priority tasks for each
datasource.
*
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
index ff860ee..67b99b9 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
@@ -39,6 +39,7 @@ import org.junit.rules.ExpectedException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.Map;
public class HttpIndexingServiceClientTest
{
@@ -161,4 +162,70 @@ public class HttpIndexingServiceClientTest
httpIndexingServiceClient.sample(samplerSpec);
EasyMock.verify(druidLeaderClient, response);
}
+
+ @Test
+ public void testGetTaskReport() throws Exception
+ {
+ String taskId = "testTaskId";
+ HttpResponse response = EasyMock.createMock(HttpResponse.class);
+ EasyMock.expect(response.getContent()).andReturn(new
BigEndianHeapChannelBuffer(0));
+ EasyMock.replay(response);
+
+ Map<String, Object> dummyResponse = ImmutableMap.of("test", "value");
+
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ HttpResponseStatus.OK,
+ response,
+ StandardCharsets.UTF_8
+ ).addChunk(jsonMapper.writeValueAsString(dummyResponse));
+
+ EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+ .andReturn(responseHolder)
+ .anyTimes();
+
+ EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/task/testTaskId/reports"))
+ .andReturn(new Request(
+ HttpMethod.GET,
+ new
URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports")
+ ))
+ .anyTimes();
+ EasyMock.replay(druidLeaderClient);
+
+ final Map<String, Object> actualResponse =
httpIndexingServiceClient.getTaskReport(taskId);
+ Assert.assertEquals(dummyResponse, actualResponse);
+
+ EasyMock.verify(druidLeaderClient, response);
+ }
+
+ @Test
+ public void testGetTaskReportEmpty() throws Exception
+ {
+ String taskId = "testTaskId";
+ HttpResponse response = EasyMock.createMock(HttpResponse.class);
+ EasyMock.expect(response.getContent()).andReturn(new
BigEndianHeapChannelBuffer(0));
+ EasyMock.replay(response);
+
+ StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+ HttpResponseStatus.OK,
+ response,
+ StandardCharsets.UTF_8
+ ).addChunk("");
+
+ EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+ .andReturn(responseHolder)
+ .anyTimes();
+
+ EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/task/testTaskId/reports"))
+ .andReturn(new Request(
+ HttpMethod.GET,
+ new
URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports")
+ ))
+ .anyTimes();
+ EasyMock.replay(druidLeaderClient);
+
+ final Map<String, Object> actualResponse =
httpIndexingServiceClient.getTaskReport(taskId);
+ Assert.assertNull(actualResponse);
+
+ EasyMock.verify(druidLeaderClient, response);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index af307a4..2035dba 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -107,6 +107,13 @@ public class NoopIndexingServiceClient implements
IndexingServiceClient
return null;
}
+ @Nullable
+ @Override
+ public Map<String, Object> getTaskReport(String taskId)
+ {
+ return null;
+ }
+
@Override
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer>
minTaskPriority)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]