This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0de44d91f1b Cleanup serialiazation of TaskReportMap (#16217)
0de44d91f1b is described below
commit 0de44d91f1b8aeded7bcc6c04e3058ae86215bb1
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Apr 2 00:23:24 2024 +0530
Cleanup serialiazation of TaskReportMap (#16217)
* Build task reports in AbstractBatchIndexTask
* Minor cleanup
* Apply suggestions from code review by @abhishekrb
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
* Cleanup IndexTaskTest
* Fix formatting
* Fix coverage
* Cleanup serialization of TaskReport map
* Replace occurrences of Map<String, TaskReport>
* Return TaskReport.ReportMap for live reports, fix test comparisons
* Address test failures
---------
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
.../java/org/apache/druid/msq/exec/Controller.java | 3 +-
.../apache/druid/msq/exec/ControllerContext.java | 4 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 2 +-
.../msq/indexing/IndexerControllerContext.java | 4 +-
.../msq/indexing/client/ControllerChatHandler.java | 3 +-
.../druid/msq/indexing/WorkerChatHandlerTest.java | 3 +-
.../indexing/client/ControllerChatHandlerTest.java | 69 ++++++++++++
.../msq/indexing/report/MSQTaskReportTest.java | 5 +-
.../druid/msq/test/MSQTestControllerContext.java | 6 +-
.../msq/test/MSQTestOverlordServiceClient.java | 4 +-
.../druid/msq/test/MSQTestWorkerContext.java | 2 +-
.../druid/indexing/common/KillTaskReport.java | 19 ++++
.../common/MultipleFileTaskReportFileWriter.java | 2 +-
.../common/SingleFileTaskReportFileWriter.java | 21 +---
.../apache/druid/indexing/common/TaskReport.java | 29 ++++-
.../indexing/common/TaskReportFileWriter.java | 4 +-
.../common/task/AbstractBatchIndexTask.java | 26 ++++-
.../task/AppenderatorDriverRealtimeIndexTask.java | 2 +-
.../druid/indexing/common/task/CompactionTask.java | 10 +-
.../indexing/common/task/HadoopIndexTask.java | 2 +-
.../druid/indexing/common/task/IndexTask.java | 24 ++--
.../GeneratedPartitionsMetadataReport.java | 3 +-
.../batch/parallel/GeneratedPartitionsReport.java | 7 +-
.../parallel/ParallelIndexSupervisorTask.java | 33 ++----
.../parallel/PartialHashSegmentGenerateTask.java | 2 +-
.../parallel/PartialRangeSegmentGenerateTask.java | 2 +-
.../batch/parallel/PartialSegmentGenerateTask.java | 6 +-
.../batch/parallel/PartialSegmentMergeTask.java | 6 +-
.../task/batch/parallel/PushedSegmentsReport.java | 7 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 26 ++---
.../SeekableStreamIndexTaskRunner.java | 2 +-
.../AppenderatorDriverRealtimeIndexTaskTest.java | 4 +-
.../druid/indexing/common/task/IndexTaskTest.java | 4 +-
.../indexing/common/task/IngestionTestBase.java | 10 +-
.../common/task/KillUnusedSegmentsTaskTest.java | 2 +-
.../common/task/NoopTestTaskReportFileWriter.java | 4 +-
.../indexing/common/task/TaskReportSerdeTest.java | 94 ++++++++++------
.../AbstractMultiPhaseParallelIndexingTest.java | 7 +-
.../AbstractParallelIndexSupervisorTaskTest.java | 125 ++++++++++++---------
.../MultiPhaseParallelIndexingRowStatsTest.java | 14 +--
.../ParallelIndexSupervisorTaskResourceTest.java | 9 +-
.../batch/parallel/PushedSegmentsReportTest.java | 11 +-
.../parallel/SinglePhaseParallelIndexingTest.java | 15 ++-
.../SeekableStreamIndexTaskTestBase.java | 4 +-
44 files changed, 387 insertions(+), 254 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
index b7a93de5ed0..1fa23e3d1b7 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
@@ -32,7 +32,6 @@ import
org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.Map;
/**
* Interface for the controller of a multi-stage query.
@@ -123,6 +122,6 @@ public interface Controller
List<String> getTaskIds();
@Nullable
- Map<String, TaskReport> liveReports();
+ TaskReport.ReportMap liveReports();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index 0a32158cf9b..35aa1c79ef0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -28,8 +28,6 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
-import java.util.Map;
-
/**
* Context used by multi-stage query controllers.
*
@@ -80,5 +78,5 @@ public interface ControllerContext
/**
* Writes controller task report.
*/
- void writeReports(String controllerTaskId, Map<String, TaskReport> reports);
+ void writeReports(String controllerTaskId, TaskReport.ReportMap reports);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index aeba1be947c..6db998bfef2 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -916,7 +916,7 @@ public class ControllerImpl implements Controller
@Override
@Nullable
- public Map<String, TaskReport> liveReports()
+ public TaskReport.ReportMap liveReports()
{
final QueryDefinition queryDef = queryDefRef.get();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 401d6af7072..1426726d592 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -41,8 +41,6 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.DruidNode;
-import java.util.Map;
-
/**
* Implementation for {@link ControllerContext} required to run multi-stage
queries as indexing tasks.
*/
@@ -126,7 +124,7 @@ public class IndexerControllerContext implements
ControllerContext
}
@Override
- public void writeReports(String controllerTaskId, Map<String, TaskReport>
reports)
+ public void writeReports(String controllerTaskId, TaskReport.ReportMap
reports)
{
toolbox.getTaskReportFileWriter().write(controllerTaskId, reports);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
index 22d3b31cf10..2d979fb8bda 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
@@ -45,7 +45,6 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
-import java.util.Map;
public class ControllerChatHandler implements ChatHandler
{
@@ -189,7 +188,7 @@ public class ControllerChatHandler implements ChatHandler
public Response httpGetLiveReports(@Context final HttpServletRequest req)
{
ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(),
toolbox.getAuthorizerMapper());
- final Map<String, TaskReport> reports = controller.liveReports();
+ final TaskReport.ReportMap reports = controller.liveReports();
if (reports == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
index c88f22d1b74..70ed5651300 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
@@ -53,7 +53,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.util.HashMap;
-import java.util.Map;
public class WorkerChatHandlerTest
{
@@ -88,7 +87,7 @@ public class WorkerChatHandlerTest
new TaskReportFileWriter()
{
@Override
- public void write(String taskId, Map<String,
TaskReport> reports)
+ public void write(String taskId,
TaskReport.ReportMap reports)
{
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
new file mode 100644
index 00000000000..81da98d6fa7
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.client;
+
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+
+public class ControllerChatHandlerTest
+{
+ @Test
+ public void testHttpGetLiveReports()
+ {
+ final Controller controller = Mockito.mock(Controller.class);
+
+ TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new
KillTaskReport.Stats(1, 2, 3)));
+
+ Mockito.when(controller.liveReports())
+ .thenReturn(reportMap);
+
+ MSQControllerTask task = Mockito.mock(MSQControllerTask.class);
+ Mockito.when(task.getDataSource())
+ .thenReturn("wiki");
+ Mockito.when(controller.task())
+ .thenReturn(task);
+
+ TaskToolbox toolbox = Mockito.mock(TaskToolbox.class);
+ Mockito.when(toolbox.getAuthorizerMapper())
+ .thenReturn(new AuthorizerMapper(null));
+
+ ControllerChatHandler chatHandler = new ControllerChatHandler(toolbox,
controller);
+
+ HttpServletRequest httpRequest = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(httpRequest.getAttribute(ArgumentMatchers.anyString()))
+ .thenReturn("allow-all");
+ Response response = chatHandler.httpGetLiveReports(httpRequest);
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(reportMap, response.getEntity());
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 4bc3d1075c1..df492a70056 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -57,7 +57,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class MSQTaskReportTest
{
@@ -242,9 +241,9 @@ public class MSQTaskReportTest
writer.setObjectMapper(mapper);
writer.write(TASK_ID, TaskReport.buildTaskReports(report));
- final Map<String, TaskReport> reportMap = mapper.readValue(
+ final TaskReport.ReportMap reportMap = mapper.readValue(
reportFile,
- new TypeReference<Map<String, TaskReport>>()
+ new TypeReference<TaskReport.ReportMap>()
{
}
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 5ab8932de3e..e0b0a837de4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -88,7 +88,7 @@ public class MSQTestControllerContext implements
ControllerContext
private final ServiceEmitter emitter = new NoopServiceEmitter();
private Controller controller;
- private Map<String, TaskReport> report = null;
+ private TaskReport.ReportMap report = null;
private final WorkerMemoryParameters workerMemoryParameters;
public MSQTestControllerContext(
@@ -273,14 +273,14 @@ public class MSQTestControllerContext implements
ControllerContext
}
@Override
- public void writeReports(String controllerTaskId, Map<String, TaskReport>
taskReport)
+ public void writeReports(String controllerTaskId, TaskReport.ReportMap
taskReport)
{
if (controller != null && controller.id().equals(controllerTaskId)) {
report = taskReport;
}
}
- public Map<String, TaskReport> getAllReports()
+ public TaskReport.ReportMap getAllReports()
{
return report;
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index c5f601d875e..db878d70024 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -55,7 +55,7 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
private final WorkerMemoryParameters workerMemoryParameters;
private final List<ImmutableSegmentLoadInfo> loadedSegmentMetadata;
private final Map<String, Controller> inMemoryControllers = new HashMap<>();
- private final Map<String, Map<String, TaskReport>> reports = new HashMap<>();
+ private final Map<String, TaskReport.ReportMap> reports = new HashMap<>();
private final Map<String, MSQControllerTask> inMemoryControllerTask = new
HashMap<>();
private final Map<String, TaskStatus> inMemoryTaskStatus = new HashMap<>();
@@ -171,7 +171,7 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
// hooks to pull stuff out for testing
@Nullable
- public Map<String, TaskReport> getReportForTask(String id)
+ public TaskReport.ReportMap getReportForTask(String id)
{
return reports.get(id);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index e80f8e2d31f..eb4976701eb 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -126,7 +126,7 @@ public class MSQTestWorkerContext implements WorkerContext
final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter()
{
@Override
- public void write(String taskId, Map<String, TaskReport> reports)
+ public void write(String taskId, TaskReport.ReportMap reports)
{
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
index f97f761166a..f78b3d1d162 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
@@ -62,6 +62,25 @@ public class KillTaskReport implements TaskReport
return stats;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KillTaskReport that = (KillTaskReport) o;
+ return Objects.equals(taskId, that.taskId) && Objects.equals(stats,
that.stats);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(taskId, stats);
+ }
+
public static class Stats
{
private final int numSegmentsKilled;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
index 18313ccd79b..e6bc5e67f3a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
@@ -37,7 +37,7 @@ public class MultipleFileTaskReportFileWriter implements
TaskReportFileWriter
private ObjectMapper objectMapper;
@Override
- public void write(String taskId, Map<String, TaskReport> reports)
+ public void write(String taskId, TaskReport.ReportMap reports)
{
final File reportsFile = taskReportFiles.get(taskId);
if (reportsFile == null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
index 4d55dd64963..79f880d3076 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
@@ -19,17 +19,13 @@
package org.apache.druid.indexing.common;
-import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
-import java.util.Map;
public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
{
@@ -44,7 +40,7 @@ public class SingleFileTaskReportFileWriter implements
TaskReportFileWriter
}
@Override
- public void write(String taskId, Map<String, TaskReport> reports)
+ public void write(String taskId, TaskReport.ReportMap reports)
{
try {
final File reportsFileParent = reportsFile.getParentFile();
@@ -70,20 +66,9 @@ public class SingleFileTaskReportFileWriter implements
TaskReportFileWriter
public static void writeReportToStream(
final ObjectMapper objectMapper,
final OutputStream outputStream,
- final Map<String, TaskReport> reports
+ final TaskReport.ReportMap reports
) throws Exception
{
- final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
-
- try (final JsonGenerator jg =
objectMapper.getFactory().createGenerator(outputStream)) {
- jg.writeStartObject();
-
- for (final Map.Entry<String, TaskReport> entry : reports.entrySet()) {
- jg.writeFieldName(entry.getKey());
- JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers,
entry.getValue());
- }
-
- jg.writeEndObject();
- }
+ objectMapper.writeValue(outputStream, reports);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 2ac260d72fd..86ed562112f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -21,9 +21,9 @@ package org.apache.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.base.Optional;
import java.util.LinkedHashMap;
-import java.util.Map;
/**
* TaskReport objects contain additional information about an indexing task,
such as row statistics, errors, and
@@ -31,7 +31,10 @@ import java.util.Map;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value =
IngestionStatsAndErrorsTaskReport.class),
+ @JsonSubTypes.Type(
+ name = IngestionStatsAndErrorsTaskReport.REPORT_KEY,
+ value = IngestionStatsAndErrorsTaskReport.class
+ ),
@JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value =
KillTaskReport.class)
})
public interface TaskReport
@@ -48,13 +51,29 @@ public interface TaskReport
/**
* Returns an order-preserving map that is suitable for passing into {@link
TaskReportFileWriter#write}.
*/
- static Map<String, TaskReport> buildTaskReports(TaskReport... taskReports)
+ static ReportMap buildTaskReports(TaskReport... taskReports)
{
- // Use LinkedHashMap to preserve order of the reports.
- Map<String, TaskReport> taskReportMap = new LinkedHashMap<>();
+ ReportMap taskReportMap = new ReportMap();
for (TaskReport taskReport : taskReports) {
taskReportMap.put(taskReport.getReportKey(), taskReport);
}
return taskReportMap;
}
+
+ /**
+ * Represents an ordered map from report key to a TaskReport that is
compatible
+ * for writing out reports to files or serving over HTTP.
+ * <p>
+ * This class is needed for Jackson serde to work correctly. Without this
class,
+ * a TaskReport is serialized without the type information and cannot be
+ * deserialized back into a concrete implementation.
+ */
+ class ReportMap extends LinkedHashMap<String, TaskReport>
+ {
+ @SuppressWarnings("unchecked")
+ public <T extends TaskReport> Optional<T> findReport(String reportKey)
+ {
+ return Optional.fromNullable((T) get(reportKey));
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
index 908efe6e1a4..972f0b010f3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
@@ -21,11 +21,9 @@ package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.Map;
-
public interface TaskReportFileWriter
{
- void write(String taskId, Map<String, TaskReport> reports);
+ void write(String taskId, TaskReport.ReportMap reports);
void setObjectMapper(ObjectMapper objectMapper);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index a5acadf704e..4a093cde2ce 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -903,11 +903,35 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
return null;
}
+ protected TaskReport.ReportMap buildLiveIngestionStatsReport(
+ IngestionState ingestionState,
+ Map<String, Object> unparseableEvents,
+ Map<String, Object> rowStats
+ )
+ {
+ return TaskReport.buildTaskReports(
+ new IngestionStatsAndErrorsTaskReport(
+ getId(),
+ new IngestionStatsAndErrors(
+ ingestionState,
+ unparseableEvents,
+ rowStats,
+ null,
+ false,
+ 0L,
+ null,
+ null,
+ null
+ )
+ )
+ );
+ }
+
/**
* Builds a singleton map with {@link
IngestionStatsAndErrorsTaskReport#REPORT_KEY}
* as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as
value.
*/
- protected Map<String, TaskReport> buildIngestionStatsReport(
+ protected TaskReport.ReportMap buildIngestionStatsReport(
IngestionState ingestionState,
String errorMessage,
Long segmentsRead,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c00f219c777..2e4710d43cd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -609,7 +609,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
*
* @return Map of reports for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports()
+ private TaskReport.ReportMap getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 59a0a499f91..eabb9e06236 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -502,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
- Map<String, TaskReport> completionReports = new HashMap<>();
+ final TaskReport.ReportMap completionReports = new
TaskReport.ReportMap();
for (int i = 0; i < indexTaskSpecs.size(); i++) {
ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
@@ -521,9 +521,11 @@ public class CompactionTask extends AbstractBatchIndexTask
}
String reportKeySuffix = "_" + i;
- Optional.ofNullable(eachSpec.getCompletionReports())
- .ifPresent(reports -> completionReports.putAll(
- CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)));
+ Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
+ reports -> completionReports.putAll(
+ CollectionUtils.mapKeys(reports, key -> key +
reportKeySuffix)
+ )
+ );
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 5fd3572f9e2..31a4b9c6665 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -680,7 +680,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
return Response.ok(returnMap).build();
}
- private Map<String, TaskReport> getTaskCompletionReports()
+ private TaskReport.ReportMap getTaskCompletionReports()
{
return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5c538d4a0fc..532529ecfc1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -187,7 +187,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Nullable
private String errorMsg;
- private Map<String, TaskReport> completionReports;
+ private TaskReport.ReportMap completionReports;
@JsonCreator
public IndexTask(
@@ -320,7 +320,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Nullable
@JsonIgnore
- public Map<String, TaskReport> getCompletionReports()
+ public TaskReport.ReportMap getCompletionReports()
{
return completionReports;
}
@@ -415,21 +415,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- Map<String, Object> returnMap = new HashMap<>();
- Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
- Map<String, Object> payload = new HashMap<>();
- Map<String, Object> events = getTaskCompletionUnparseableEvents();
-
- payload.put("ingestionState", ingestionState);
- payload.put("unparseableEvents", events);
- payload.put("rowStats", doGetRowStats(full != null));
- ingestionStatsAndErrors.put("taskId", getId());
- ingestionStatsAndErrors.put("payload", payload);
- ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
- returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
- return Response.ok(returnMap).build();
+ final TaskReport.ReportMap liveReports = buildLiveIngestionStatsReport(
+ ingestionState,
+ getTaskCompletionUnparseableEvents(),
+ doGetRowStats(full != null)
+ );
+ return Response.ok(liveReports).build();
}
@JsonProperty("spec")
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
index a83c3cc7412..7c98b3ee579 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import java.util.List;
-import java.util.Map;
/**
* Report containing the {@link GenericPartitionStat}s created by a {@link
PartialSegmentGenerateTask}. This report is
@@ -38,7 +37,7 @@ class GeneratedPartitionsMetadataReport extends
GeneratedPartitionsReport
GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats,
- @JsonProperty("taskReport") Map<String, TaskReport> taskReport
+ @JsonProperty("taskReport") TaskReport.ReportMap taskReport
)
{
super(taskId, partitionStats, taskReport);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
index 1fa025d1c91..8fedae5a4f1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
/**
@@ -35,9 +34,9 @@ public class GeneratedPartitionsReport implements
SubTaskReport
{
private final String taskId;
private final List<PartitionStat> partitionStats;
- private final Map<String, TaskReport> taskReport;
+ private final TaskReport.ReportMap taskReport;
- GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats,
Map<String, TaskReport> taskReport)
+ GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats,
TaskReport.ReportMap taskReport)
{
this.taskId = taskId;
this.partitionStats = partitionStats;
@@ -52,7 +51,7 @@ public class GeneratedPartitionsReport implements
SubTaskReport
}
@JsonProperty
- public Map<String, TaskReport> getTaskReport()
+ public TaskReport.ReportMap getTaskReport()
{
return taskReport;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4d5b9717ca9..27694360ed9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -202,7 +202,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private volatile Pair<Map<String, Object>, Map<String, Object>>
indexGenerateRowStats;
private IngestionState ingestionState;
- private Map<String, TaskReport> completionReports;
+ private TaskReport.ReportMap completionReports;
private Long segmentsRead;
private Long segmentsPublished;
private final boolean isCompactionTask;
@@ -300,7 +300,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Nullable
@JsonIgnore
- public Map<String, TaskReport> getCompletionReports()
+ public TaskReport.ReportMap getCompletionReports()
{
return completionReports;
}
@@ -1238,7 +1238,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports(TaskStatus
taskStatus)
+ private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{
return buildIngestionStatsReport(
IngestionState.COMPLETED,
@@ -1602,7 +1602,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
// Get stats from completed tasks
Map<String, PushedSegmentsReport> completedSubtaskReports =
parallelSinglePhaseRunner.getReports();
for (PushedSegmentsReport pushedSegmentsReport :
completedSubtaskReports.values()) {
- Map<String, TaskReport> taskReport =
pushedSegmentsReport.getTaskReport();
+ TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]" +
pushedSegmentsReport.getTaskId());
continue;
@@ -1642,7 +1642,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
- Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
+ TaskReport.ReportMap taskReport =
generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]",
generatedPartitionsReport.getTaskId());
continue;
@@ -1726,7 +1726,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
- Map<String, TaskReport> taskReport,
+ TaskReport.ReportMap taskReport,
List<ParseExceptionReport> unparseableEvents
)
{
@@ -1804,12 +1804,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
@VisibleForTesting
- public Map<String, Object> doGetLiveReports(boolean isFullReport)
+ public TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
- Map<String, Object> returnMap = new HashMap<>();
- Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
- Map<String, Object> payload = new HashMap<>();
-
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents
=
doGetRowStatsAndUnparseableEvents(isFullReport, true);
@@ -1824,16 +1820,11 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
: currentSequentialTask.getIngestionState();
}
- payload.put("ingestionState", ingestionStateForReport);
- payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
- payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);
-
- ingestionStatsAndErrors.put("taskId", getId());
- ingestionStatsAndErrors.put("payload", payload);
- ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
- returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
- return returnMap;
+ return buildLiveIngestionStatsReport(
+ ingestionStateForReport,
+ rowStatsAndUnparsebleEvents.rhs,
+ rowStatsAndUnparsebleEvents.lhs
+ );
}
@GET
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 49e3591ff18..b6d382472b3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -177,7 +177,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
}
@Override
- GeneratedPartitionsMetadataReport
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment>
segments, Map<String, TaskReport> taskReport)
+ GeneratedPartitionsMetadataReport
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment>
segments, TaskReport.ReportMap taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment ->
toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 27604eb7e77..9f36ed63541 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -192,7 +192,7 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
}
@Override
- GeneratedPartitionsMetadataReport
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment>
segments, Map<String, TaskReport> taskReport)
+ GeneratedPartitionsMetadataReport
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment>
segments, TaskReport.ReportMap taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment ->
toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index bbeb00aa845..0fd6b0916b8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -125,7 +125,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
toolbox.getIndexingTmpDir()
);
- Map<String, TaskReport> taskReport =
getTaskCompletionReports(getNumSegmentsRead(inputSource));
+ TaskReport.ReportMap taskReport =
getTaskCompletionReports(getNumSegmentsRead(inputSource));
taskClient.report(createGeneratedPartitionsReport(toolbox, segments,
taskReport));
@@ -146,7 +146,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
abstract T createGeneratedPartitionsReport(
TaskToolbox toolbox,
List<DataSegment> segments,
- Map<String, TaskReport> taskReport
+ TaskReport.ReportMap taskReport
);
private Long getNumSegmentsRead(InputSource inputSource)
@@ -249,7 +249,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
+ private TaskReport.ReportMap getTaskCompletionReports(Long segmentsRead)
{
return buildIngestionStatsReport(
IngestionState.COMPLETED,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index b3467bb6f20..d4c13ca3370 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -22,12 +22,12 @@ package
org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
@@ -189,7 +189,9 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
intervalToUnzippedFiles
);
- taskClient.report(new PushedSegmentsReport(getId(),
Collections.emptySet(), pushedSegments, ImmutableMap.of()));
+ taskClient.report(
+ new PushedSegmentsReport(getId(), Collections.emptySet(),
pushedSegments, new TaskReport.ReportMap())
+ );
return TaskStatus.success(getId());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
index 8ed373c7d95..9fad997ef78 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.timeline.DataSegment;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -41,14 +40,14 @@ public class PushedSegmentsReport implements SubTaskReport
private final String taskId;
private final Set<DataSegment> oldSegments;
private final Set<DataSegment> newSegments;
- private final Map<String, TaskReport> taskReport;
+ private final TaskReport.ReportMap taskReport;
@JsonCreator
public PushedSegmentsReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("oldSegments") Set<DataSegment> oldSegments,
@JsonProperty("segments") Set<DataSegment> newSegments,
- @JsonProperty("taskReport") Map<String, TaskReport> taskReport
+ @JsonProperty("taskReport") TaskReport.ReportMap taskReport
)
{
this.taskId = Preconditions.checkNotNull(taskId, "taskId");
@@ -77,7 +76,7 @@ public class PushedSegmentsReport implements SubTaskReport
}
@JsonProperty("taskReport")
- public Map<String, TaskReport> getTaskReport()
+ public TaskReport.ReportMap getTaskReport()
{
return taskReport;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 465750bad03..6af09461c40 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -282,7 +282,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
.transform(PartitionChunk::getObject)
.toSet();
- Map<String, TaskReport> taskReport = getTaskCompletionReports();
+ TaskReport.ReportMap taskReport = getTaskCompletionReports();
taskClient.report(new PushedSegmentsReport(getId(), oldSegments,
pushedSegments, taskReport));
toolbox.getTaskReportFileWriter().write(getId(), taskReport);
@@ -542,23 +542,13 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
return Response.ok(doGetRowStats(full != null)).build();
}
- private Map<String, Object> doGetLiveReports(boolean isFullReport)
+ private TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
- Map<String, Object> returnMap = new HashMap<>();
- Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
- Map<String, Object> payload = new HashMap<>();
- Map<String, Object> events = getTaskCompletionUnparseableEvents();
-
- payload.put("ingestionState", ingestionState);
- payload.put("unparseableEvents", events);
- payload.put("rowStats", doGetRowStats(isFullReport));
-
- ingestionStatsAndErrors.put("taskId", getId());
- ingestionStatsAndErrors.put("payload", payload);
- ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
- returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
- return returnMap;
+ return buildLiveIngestionStatsReport(
+ ingestionState,
+ getTaskCompletionUnparseableEvents(),
+ doGetRowStats(isFullReport)
+ );
}
@GET
@@ -585,7 +575,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports()
+ private TaskReport.ReportMap getTaskCompletionReports()
{
return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null,
null);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f3e1e4a06d2..e5a1958a1fa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1118,7 +1118,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* @param handoffWaitMs Milliseconds waited for segments to be handed off.
* @return Map of reports for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports(@Nullable String
errorMsg, long handoffWaitMs)
+ private TaskReport.ReportMap getTaskCompletionReports(@Nullable String
errorMsg, long handoffWaitMs)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index e91d25c0b03..fe6c39b17ed 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1678,9 +1678,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
private IngestionStatsAndErrors getTaskReportData() throws IOException
{
- Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
+ TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
reportsFile,
- new TypeReference<Map<String, TaskReport>>()
+ new TypeReference<TaskReport.ReportMap>()
{
}
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 2619a23ba33..4e5dadf4dbf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -2548,9 +2548,9 @@ public class IndexTaskTest extends IngestionTestBase
private IngestionStatsAndErrors getTaskReportData() throws IOException
{
- Map<String, TaskReport> taskReports = jsonMapper.readValue(
+ TaskReport.ReportMap taskReports = jsonMapper.readValue(
taskRunner.getTaskReportsFile(),
- new TypeReference<Map<String, TaskReport>>()
+ new TypeReference<TaskReport.ReportMap>()
{
}
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 2fe8790fb9c..f0f7e329992 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
@@ -270,9 +269,6 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
/**
* Converts ParseSpec to InputFormat for indexing tests. To be used until
{@link FirehoseFactory}
* & {@link InputRowParser} is deprecated and removed.
- *
- * @param parseSpec
- * @return
*/
public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
{
@@ -510,11 +506,9 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
}
}
- public Map<String, TaskReport> getReports() throws IOException
+ public TaskReport.ReportMap getReports() throws IOException
{
- return objectMapper.readValue(reportsFile, new TypeReference<Map<String,
TaskReport>>()
- {
- });
+ return objectMapper.readValue(reportsFile, TaskReport.ReportMap.class);
}
public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index 20de427508f..488f8ce98dd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -1134,7 +1134,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
try {
Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(),
- new TypeReference<Map<String, TaskReport>>()
+ new TypeReference<TaskReport.ReportMap>()
{
}
).get(KillTaskReport.REPORT_KEY).getPayload();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
index 0398a219516..dddbe1bd7c3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
@@ -23,12 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
-import java.util.Map;
-
public class NoopTestTaskReportFileWriter implements TaskReportFileWriter
{
@Override
- public void write(String id, Map<String, TaskReport> reports)
+ public void write(String id, TaskReport.ReportMap reports)
{
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index aece6edb3f4..b0fced552b3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils;
@@ -38,7 +39,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.charset.StandardCharsets;
-import java.util.Map;
+import java.util.Collections;
public class TaskReportSerdeTest
{
@@ -55,47 +56,56 @@ public class TaskReportSerdeTest
}
@Test
- public void testSerde() throws Exception
+ public void testSerdeOfIngestionReport() throws Exception
{
- IngestionStatsAndErrorsTaskReport report1 = new
IngestionStatsAndErrorsTaskReport(
- "testID",
- new IngestionStatsAndErrors(
- IngestionState.BUILD_SEGMENTS,
- ImmutableMap.of(
- "hello", "world"
- ),
- ImmutableMap.of(
- "number", 1234
- ),
- "an error message",
- true,
- 1000L,
- ImmutableMap.of("PartitionA", 5000L),
- 5L,
- 10L
- )
- );
- String report1serialized = jsonMapper.writeValueAsString(report1);
- IngestionStatsAndErrorsTaskReport report2 =
(IngestionStatsAndErrorsTaskReport) jsonMapper.readValue(
- report1serialized,
- TaskReport.class
- );
- Assert.assertEquals(report1, report2);
- Assert.assertEquals(report1.hashCode(), report2.hashCode());
+ IngestionStatsAndErrorsTaskReport originalReport =
buildTestIngestionReport();
+ String reportJson = jsonMapper.writeValueAsString(originalReport);
+ TaskReport deserialized = jsonMapper.readValue(reportJson,
TaskReport.class);
+
+ Assert.assertTrue(deserialized instanceof
IngestionStatsAndErrorsTaskReport);
+
+ IngestionStatsAndErrorsTaskReport deserializedReport =
(IngestionStatsAndErrorsTaskReport) deserialized;
+ Assert.assertEquals(originalReport, deserializedReport);
+ }
+
+ @Test
+ public void testSerdeOfKillTaskReport() throws Exception
+ {
+ KillTaskReport originalReport = new KillTaskReport("taskId", new
KillTaskReport.Stats(1, 2, 3));
+ String reportJson = jsonMapper.writeValueAsString(originalReport);
+ TaskReport deserialized = jsonMapper.readValue(reportJson,
TaskReport.class);
+
+ Assert.assertTrue(deserialized instanceof KillTaskReport);
+ KillTaskReport deserializedReport = (KillTaskReport) deserialized;
+ Assert.assertEquals(originalReport, deserializedReport);
+ }
+
+ @Test
+ public void testWriteReportMapToFileAndRead() throws Exception
+ {
+ IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new
SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
- Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1);
+ TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
writer.write("testID", reportMap1);
- Map<String, TaskReport> reportMap2 = jsonMapper.readValue(
- reportFile,
- new TypeReference<Map<String, TaskReport>>() {}
- );
+ TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile,
TaskReport.ReportMap.class);
Assert.assertEquals(reportMap1, reportMap2);
}
+ @Test
+ public void testWriteReportMapToStringAndRead() throws Exception
+ {
+ IngestionStatsAndErrorsTaskReport ingestionReport =
buildTestIngestionReport();
+ TaskReport.ReportMap reportMap =
TaskReport.buildTaskReports(ingestionReport);
+ String json = jsonMapper.writeValueAsString(reportMap);
+
+ TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json,
TaskReport.ReportMap.class);
+ Assert.assertEquals(reportMap, deserializedReportMap);
+ }
+
@Test
public void testSerializationOnMissingPartitionStats() throws Exception
{
@@ -150,7 +160,7 @@ public class TaskReportSerdeTest
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new
SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
- writer.write("theTask", ImmutableMap.of("report", new
ExceptionalTaskReport()));
+ writer.write("theTask", TaskReport.buildTaskReports(new
ExceptionalTaskReport()));
// Read the file, ensure it's incomplete and not valid JSON. This allows
callers to determine the report was
// not complete when written.
@@ -160,6 +170,24 @@ public class TaskReportSerdeTest
);
}
+ private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
+ {
+ return new IngestionStatsAndErrorsTaskReport(
+ "testID",
+ new IngestionStatsAndErrors(
+ IngestionState.BUILD_SEGMENTS,
+ Collections.singletonMap("hello", "world"),
+ Collections.singletonMap("number", 1234),
+ "an error message",
+ true,
+ 1000L,
+ Collections.singletonMap("PartitionA", 5000L),
+ 5L,
+ 10L
+ )
+ );
+ }
+
/**
* Task report that throws an exception while being serialized.
*/
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index db971e1ed86..cb6aa39d5c6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
-import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
@@ -31,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource;
@@ -66,7 +66,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
@SuppressWarnings("SameParameterValue")
@@ -183,10 +182,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
return getIndexingServiceClient().getPublishedSegments(task);
}
- Map<String, Object> runTaskAndGetReports(Task task, TaskState
expectedTaskStatus)
+ TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState
expectedTaskStatus)
{
runTaskAndVerifyStatus(task, expectedTaskStatus);
- return
FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()),
true);
+ return getIndexingServiceClient().getLiveReportsForTask(task.getId());
}
protected ParallelIndexSupervisorTask createTask(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index a6a9f6ccdd7..84fac9b82d8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -49,10 +49,13 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -118,7 +121,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -546,12 +548,17 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
@Override
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ public TaskReport.ReportMap getLiveReportsForTask(String taskId)
{
final Optional<Task> task = getTaskStorage().getTask(taskId);
if (!task.isPresent()) {
return null;
}
- return Futures.immediateFuture(((ParallelIndexSupervisorTask)
task.get()).doGetLiveReports(true));
+ return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true);
}
public TaskContainer getTaskContainer(String taskId)
@@ -773,20 +780,16 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
}
}
- protected Map<String, Object> buildExpectedTaskReportSequential(
+ protected TaskReport.ReportMap buildExpectedTaskReportSequential(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedDeterminePartitions,
RowIngestionMetersTotals expectedTotals
)
{
- final Map<String, Object> payload = new HashMap<>();
+ final Map<String, Object> unparseableEvents =
+ ImmutableMap.of("determinePartitions", ImmutableList.of(),
"buildSegments", expectedUnparseableEvents);
- payload.put("ingestionState", IngestionState.COMPLETED);
- payload.put(
- "unparseableEvents",
- ImmutableMap.of("determinePartitions", ImmutableList.of(),
"buildSegments", expectedUnparseableEvents)
- );
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0,
"processedBytes", 0.0,
@@ -801,72 +804,90 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
"15m", emptyAverageMinuteMap
);
- payload.put(
- "rowStats",
- ImmutableMap.of(
- "movingAverages",
- ImmutableMap.of("determinePartitions", emptyAverages,
"buildSegments", emptyAverages),
- "totals",
- ImmutableMap.of("determinePartitions",
expectedDeterminePartitions, "buildSegments", expectedTotals)
- )
+ final Map<String, Object> rowStats = ImmutableMap.of(
+ "movingAverages",
+ ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments",
emptyAverages),
+ "totals",
+ ImmutableMap.of("determinePartitions", expectedDeterminePartitions,
"buildSegments", expectedTotals)
);
- final Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
- ingestionStatsAndErrors.put("taskId", taskId);
- ingestionStatsAndErrors.put("payload", payload);
- ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
- return Collections.singletonMap("ingestionStatsAndErrors",
ingestionStatsAndErrors);
+ return TaskReport.buildTaskReports(
+ new IngestionStatsAndErrorsTaskReport(
+ taskId,
+ new IngestionStatsAndErrors(
+ IngestionState.COMPLETED,
+ unparseableEvents,
+ rowStats,
+ null,
+ false,
+ 0L,
+ null,
+ null,
+ null
+ )
+ )
+ );
}
- protected Map<String, Object> buildExpectedTaskReportParallel(
+ protected TaskReport.ReportMap buildExpectedTaskReportParallel(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals
)
{
- Map<String, Object> returnMap = new HashMap<>();
- Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
- Map<String, Object> payload = new HashMap<>();
-
- payload.put("ingestionState", IngestionState.COMPLETED);
- payload.put("unparseableEvents", ImmutableMap.of("buildSegments",
expectedUnparseableEvents));
- payload.put("rowStats", ImmutableMap.of("totals",
ImmutableMap.of("buildSegments", expectedTotals)));
+ Map<String, Object> unparseableEvents = ImmutableMap.of("buildSegments",
expectedUnparseableEvents);
+ Map<String, Object> rowStats = ImmutableMap.of("totals",
ImmutableMap.of("buildSegments", expectedTotals));
- ingestionStatsAndErrors.put("taskId", taskId);
- ingestionStatsAndErrors.put("payload", payload);
- ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
- returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
- return returnMap;
+ return TaskReport.buildTaskReports(
+ new IngestionStatsAndErrorsTaskReport(
+ taskId,
+ new IngestionStatsAndErrors(
+ IngestionState.COMPLETED,
+ unparseableEvents,
+ rowStats,
+ null,
+ false,
+ 0L,
+ null,
+ null,
+ null
+ )
+ )
+ );
}
protected void compareTaskReports(
- Map<String, Object> expectedReports,
- Map<String, Object> actualReports
+ TaskReport.ReportMap expectedReports,
+ TaskReport.ReportMap actualReports
)
{
- expectedReports = (Map<String, Object>)
expectedReports.get("ingestionStatsAndErrors");
- actualReports = (Map<String, Object>)
actualReports.get("ingestionStatsAndErrors");
+ final Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
+ = expectedReports.findReport("ingestionStatsAndErrors");
+ final Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
+ = actualReports.findReport("ingestionStatsAndErrors");
+
+ Assert.assertTrue(expectedReportOptional.isPresent());
+ Assert.assertTrue(actualReportOptional.isPresent());
+
+ final IngestionStatsAndErrorsTaskReport expectedReport =
expectedReportOptional.get();
+ final IngestionStatsAndErrorsTaskReport actualReport =
actualReportOptional.get();
- Assert.assertEquals(expectedReports.get("taskId"),
actualReports.get("taskId"));
- Assert.assertEquals(expectedReports.get("type"),
actualReports.get("type"));
+ Assert.assertEquals(expectedReport.getTaskId(), actualReport.getTaskId());
+ Assert.assertEquals(expectedReport.getReportKey(),
actualReport.getReportKey());
- Map<String, Object> expectedPayload = (Map<String, Object>)
expectedReports.get("payload");
- Map<String, Object> actualPayload = (Map<String, Object>)
actualReports.get("payload");
- Assert.assertEquals(expectedPayload.get("ingestionState"),
actualPayload.get("ingestionState"));
+ final IngestionStatsAndErrors expectedPayload =
expectedReport.getPayload();
+ final IngestionStatsAndErrors actualPayload = actualReport.getPayload();
+ Assert.assertEquals(expectedPayload.getIngestionState(),
actualPayload.getIngestionState());
- Map<String, Object> expectedTotals = (Map<String, Object>)
expectedPayload.get("totals");
- Map<String, Object> actualTotals = (Map<String, Object>)
actualReports.get("totals");
+ Map<String, Object> expectedTotals = expectedPayload.getRowStats();
+ Map<String, Object> actualTotals = actualPayload.getRowStats();
Assert.assertEquals(expectedTotals, actualTotals);
List<ParseExceptionReport> expectedParseExceptionReports =
- (List<ParseExceptionReport>) ((Map<String, Object>)
- expectedPayload.get("unparseableEvents")).get("buildSegments");
+ (List<ParseExceptionReport>)
(expectedPayload.getUnparseableEvents()).get("buildSegments");
List<ParseExceptionReport> actualParseExceptionReports =
- (List<ParseExceptionReport>) ((Map<String, Object>)
- actualPayload.get("unparseableEvents")).get("buildSegments");
+ (List<ParseExceptionReport>)
(actualPayload.getUnparseableEvents()).get("buildSegments");
List<String> expectedMessages = expectedParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
index 0cf7fdd147b..b020c360ba8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@@ -43,7 +44,6 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
-import java.util.Map;
public class MultiPhaseParallelIndexingRowStatsTest extends
AbstractMultiPhaseParallelIndexingTest
{
@@ -133,8 +133,8 @@ public class MultiPhaseParallelIndexingRowStatsTest extends
AbstractMultiPhasePa
false
);
- final RowIngestionMetersTotals expectedTotals =
RowMeters.with().totalProcessed(200);
- final Map<String, Object> expectedReports =
+ final RowIngestionMetersTotals expectedTotals =
RowMeters.with().bytes(5630).totalProcessed(200);
+ final TaskReport.ReportMap expectedReports =
maxNumConcurrentSubTasks <= 1
? buildExpectedTaskReportSequential(
task.getId(),
@@ -148,7 +148,7 @@ public class MultiPhaseParallelIndexingRowStatsTest extends
AbstractMultiPhasePa
expectedTotals
);
- Map<String, Object> actualReports = runTaskAndGetReports(task,
TaskState.SUCCESS);
+ TaskReport.ReportMap actualReports = runTaskAndGetReports(task,
TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports);
}
@@ -169,12 +169,12 @@ public class MultiPhaseParallelIndexingRowStatsTest
extends AbstractMultiPhasePa
false,
false
);
- Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
+ TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(),
- new RowIngestionMetersTotals(200, 0, 0, 0, 0)
+ new RowIngestionMetersTotals(200, 5630, 0, 0, 0)
);
- Map<String, Object> actualReports = runTaskAndGetReports(task,
TaskState.SUCCESS);
+ TaskReport.ReportMap actualReports = runTaskAndGetReports(task,
TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 266604a8542..6a4e5f90c64 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSplit;
@@ -35,6 +34,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -713,7 +713,12 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
);
taskClient.report(
- new PushedSegmentsReport(getId(), Collections.emptySet(),
Collections.singleton(segment), ImmutableMap.of())
+ new PushedSegmentsReport(
+ getId(),
+ Collections.emptySet(),
+ Collections.singleton(segment),
+ new TaskReport.ReportMap()
+ )
);
return TaskStatus.fromCode(getId(), state);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
index 05d0a1fe9de..3edce650688 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
@@ -20,6 +20,8 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
import org.junit.Test;
public class PushedSegmentsReportTest
@@ -27,6 +29,13 @@ public class PushedSegmentsReportTest
@Test
public void testEquals()
{
-
EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify();
+ TaskReport.ReportMap map1 = new TaskReport.ReportMap();
+ TaskReport.ReportMap map2 = new TaskReport.ReportMap();
+ map2.put("killTaskReport", new KillTaskReport("taskId", new
KillTaskReport.Stats(1, 2, 3)));
+
+ EqualsVerifier.forClass(PushedSegmentsReport.class)
+ .usingGetClass()
+ .withPrefabValues(TaskReport.ReportMap.class, map1, map2)
+ .verify();
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7ad9f3c9464..bc2e38f798e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
@@ -446,9 +447,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
false,
Collections.emptyList()
);
- Map<String, Object> actualReports = task.doGetLiveReports(true);
- final long processedBytes = useInputFormatApi ? 335 : 0;
- Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
+ TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
+ TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(
new ParseExceptionReport(
@@ -464,7 +464,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
1L
)
),
- new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1)
+ new RowIngestionMetersTotals(10, 335, 1, 1, 1)
);
compareTaskReports(expectedReports, actualReports);
}
@@ -497,10 +497,9 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
TaskContainer taskContainer =
getIndexingServiceClient().getTaskContainer(task.getId());
final ParallelIndexSupervisorTask executedTask =
(ParallelIndexSupervisorTask) taskContainer.getTask();
- Map<String, Object> actualReports = executedTask.doGetLiveReports(true);
+ TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
- final long processedBytes = useInputFormatApi ? 335 : 0;
- RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10,
processedBytes, 1, 1, 1);
+ final RowIngestionMetersTotals expectedTotals = new
RowIngestionMetersTotals(10, 335, 1, 1, 1);
List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
new ParseExceptionReport(
"{ts=2017unparseable}",
@@ -516,7 +515,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
)
);
- Map<String, Object> expectedReports;
+ TaskReport.ReportMap expectedReports;
if (useInputFormatApi) {
expectedReports = buildExpectedTaskReportSequential(
task.getId(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index fb454acafec..1f7bd929d8a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -461,9 +461,9 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
protected IngestionStatsAndErrors getTaskReportData() throws IOException
{
- Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
+ TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
reportsFile,
- new TypeReference<Map<String, TaskReport>>()
+ new TypeReference<TaskReport.ReportMap>()
{
}
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]