This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4df4896674f Refactor: Add common method in AbstractBatchIndexTask to
create ingestion stats report (#16202)
4df4896674f is described below
commit 4df4896674f57ef49e2736aa613f271112a2a111
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Mar 28 23:07:00 2024 +0530
Refactor: Add common method in AbstractBatchIndexTask to create ingestion
stats report (#16202)
Changes
- No functional changes
- Add method `AbstractBatchIndexTask.buildIngestionStatsReport()` used in
several batch tasks
- Add utility method `AbstractBatchIndexTask.addBuildSegmentStatsToReport()`
- Use boolean argument to represent a full report instead of the String
`full`
in internal methods. (REST API remains unchanged.)
- Rename `IngestionStatsAndErrorsTaskReportData` to
`IngestionStatsAndErrors`
- Clean up some of the methods
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 12 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 6 +-
...eportData.java => IngestionStatsAndErrors.java} | 49 +-
.../common/IngestionStatsAndErrorsTaskReport.java | 6 +-
.../common/task/AbstractBatchIndexTask.java | 128 ++--
.../task/AppenderatorDriverRealtimeIndexTask.java | 4 +-
.../indexing/common/task/HadoopIndexTask.java | 22 +-
.../druid/indexing/common/task/IndexTask.java | 97 +--
.../parallel/ParallelIndexSupervisorTask.java | 96 +--
.../batch/parallel/PartialSegmentGenerateTask.java | 32 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 77 +--
.../SeekableStreamIndexTaskRunner.java | 4 +-
.../AppenderatorDriverRealtimeIndexTaskTest.java | 16 +-
.../common/task/CompactionTaskParallelRunTest.java | 8 +-
.../common/task/CompactionTaskRunTest.java | 6 +-
.../druid/indexing/common/task/IndexTaskTest.java | 735 ++++++---------------
.../indexing/common/task/ParseExceptionReport.java | 4 +-
.../indexing/common/task/TaskReportSerdeTest.java | 6 +-
.../AbstractParallelIndexSupervisorTaskTest.java | 8 +-
.../parallel/SinglePhaseParallelIndexingTest.java | 4 +-
.../SeekableStreamIndexTaskTestBase.java | 6 +-
.../testsEx/indexer/AbstractITBatchIndexTest.java | 4 +-
.../clients/OverlordResourceTestClient.java | 7 +-
.../tests/indexer/AbstractITBatchIndexTest.java | 4 +-
.../druid/tests/indexer/ITCompactionTaskTest.java | 6 +-
25 files changed, 470 insertions(+), 877 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index c3d1d213383..85a9fdb6254 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -54,7 +54,7 @@ import
org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;
@@ -1615,7 +1615,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
newDataSchemaMetadata()
);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
@@ -1695,7 +1695,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
@@ -3061,7 +3061,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
// Verify unparseable data
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
@@ -3233,7 +3233,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
TaskStatus status = future.get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(),
(Long) 6L);
}
@@ -3281,7 +3281,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
TaskStatus status = future.get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L,
2L)));
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index e14b6679b09..ad8964d712c 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -43,7 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
@@ -1182,7 +1182,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
newDataSchemaMetadata()
);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
@@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
similarity index 83%
rename from
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
rename to
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
index 97ea58e1c5c..d55f5ed0d19 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
@@ -27,36 +27,19 @@ import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
-public class IngestionStatsAndErrorsTaskReportData
+public class IngestionStatsAndErrors
{
- @JsonProperty
- private IngestionState ingestionState;
-
- @JsonProperty
- private Map<String, Object> unparseableEvents;
-
- @JsonProperty
- private Map<String, Object> rowStats;
-
- @JsonProperty
- @Nullable
- private String errorMsg;
-
- @JsonProperty
- private boolean segmentAvailabilityConfirmed;
-
- @JsonProperty
- private long segmentAvailabilityWaitTimeMs;
-
- @JsonProperty
- private Map<String, Long> recordsProcessed;
-
- @JsonProperty
- private Long segmentsRead;
- @JsonProperty
- private Long segmentsPublished;
-
- public IngestionStatsAndErrorsTaskReportData(
+ private final IngestionState ingestionState;
+ private final Map<String, Object> unparseableEvents;
+ private final Map<String, Object> rowStats;
+ private final String errorMsg;
+ private final boolean segmentAvailabilityConfirmed;
+ private final long segmentAvailabilityWaitTimeMs;
+ private final Map<String, Long> recordsProcessed;
+ private final Long segmentsRead;
+ private final Long segmentsPublished;
+
+ public IngestionStatsAndErrors(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
@@ -139,12 +122,12 @@ public class IngestionStatsAndErrorsTaskReportData
return segmentsPublished;
}
- public static IngestionStatsAndErrorsTaskReportData
getPayloadFromTaskReports(
+ public static IngestionStatsAndErrors getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
{
- return (IngestionStatsAndErrorsTaskReportData)
taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
- .getPayload();
+ return (IngestionStatsAndErrors)
taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+ .getPayload();
}
@Override
@@ -156,7 +139,7 @@ public class IngestionStatsAndErrorsTaskReportData
if (o == null || getClass() != o.getClass()) {
return false;
}
- IngestionStatsAndErrorsTaskReportData that =
(IngestionStatsAndErrorsTaskReportData) o;
+ IngestionStatsAndErrors that = (IngestionStatsAndErrors) o;
return getIngestionState() == that.getIngestionState() &&
Objects.equals(getUnparseableEvents(), that.getUnparseableEvents())
&&
Objects.equals(getRowStats(), that.getRowStats()) &&
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 7518523b1de..0122a8d3ffb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -32,12 +32,12 @@ public class IngestionStatsAndErrorsTaskReport implements
TaskReport
private final String taskId;
@JsonProperty
- private final IngestionStatsAndErrorsTaskReportData payload;
+ private final IngestionStatsAndErrors payload;
@JsonCreator
public IngestionStatsAndErrorsTaskReport(
@JsonProperty("taskId") String taskId,
- @JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload
+ @JsonProperty("payload") IngestionStatsAndErrors payload
)
{
this.taskId = taskId;
@@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements
TaskReport
}
@Override
- public IngestionStatsAndErrorsTaskReportData getPayload()
+ public IngestionStatsAndErrors getPayload()
{
return payload;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index b1d3e992696..a5acadf704e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -30,9 +30,13 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.IngestionState;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -53,6 +57,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -110,8 +115,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
{
private static final Logger log = new Logger(AbstractBatchIndexTask.class);
- protected boolean segmentAvailabilityConfirmationCompleted = false;
- protected long segmentAvailabilityWaitTimeMs = 0L;
+ private boolean segmentAvailabilityConfirmationCompleted = false;
+ private long segmentAvailabilityWaitTimeMs = 0L;
@GuardedBy("this")
private final TaskResourceCleaner resourceCloserOnAbnormalExit = new
TaskResourceCleaner();
@@ -123,8 +128,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
private final int maxAllowedLockCount;
- // Store lock versions
- Map<Interval, String> intervalToVersion = new HashMap<>();
+ private final Map<Interval, String> intervalToLockVersion = new HashMap<>();
protected AbstractBatchIndexTask(String id, String dataSource, Map<String,
Object> context, IngestionMode ingestionMode)
{
@@ -481,7 +485,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
throw new ISE(StringUtils.format("Lock for interval [%s] was
revoked.", cur));
}
locksAcquired++;
- intervalToVersion.put(cur, lock.getVersion());
+ intervalToLockVersion.put(cur, lock.getVersion());
}
return true;
}
@@ -685,10 +689,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
* the cluster. Doing so gives an end user assurance that a Successful task
status means their data is available
* for querying.
*
- * @param toolbox {@link TaskToolbox} object with for assisting with task
work.
- * @param segmentsToWaitFor {@link List} of segments to wait for
availability.
- * @param waitTimeout Millis to wait before giving up
- * @return True if all segments became available, otherwise False.
+ * @return True if all segments became available before the {@code
waitTimeoutMillis}
+ * elapsed, otherwise false.
*/
protected boolean waitForSegmentAvailability(
TaskToolbox toolbox,
@@ -697,22 +699,22 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
)
{
if (segmentsToWaitFor.isEmpty()) {
- log.info("Asked to wait for segments to be available, but I wasn't
provided with any segments.");
+ log.info("No segments to wait for availability.");
return true;
} else if (waitTimeout < 0) {
- log.warn("Asked to wait for availability for < 0 seconds?! Requested
waitTimeout: [%s]", waitTimeout);
+ log.warn("Not waiting for segment availability as waitTimeout[%s] is
less than zero.", waitTimeout);
return false;
}
log.info("Waiting for [%d] segments to be loaded by the cluster...",
segmentsToWaitFor.size());
- final long start = System.nanoTime();
+ final Stopwatch stopwatch = Stopwatch.createStarted();
try (
SegmentHandoffNotifier notifier =
toolbox.getSegmentHandoffNotifierFactory()
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())
) {
- ExecutorService exec = Execs.directExecutor();
- CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+ final ExecutorService exec = Execs.directExecutor();
+ final CountDownLatch doneSignal = new
CountDownLatch(segmentsToWaitFor.size());
notifier.start();
for (DataSegment s : segmentsToWaitFor) {
@@ -720,11 +722,11 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
new SegmentDescriptor(s.getInterval(), s.getVersion(),
s.getShardSpec().getPartitionNum()),
exec,
() -> {
+ doneSignal.countDown();
log.debug(
- "Confirmed availability for [%s]. Removing from list of
segments to wait for",
- s.getId()
+ "Segment[%s] is now available, [%d] segments remaining.",
+ s.getId(), doneSignal.getCount()
);
- doneSignal.countDown();
}
);
}
@@ -737,7 +739,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
return false;
}
finally {
- segmentAvailabilityWaitTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ segmentAvailabilityWaitTimeMs = stopwatch.millisElapsed();
toolbox.getEmitter().emit(
new ServiceMetricEvent.Builder()
.setDimension("dataSource", getDataSource())
@@ -782,12 +784,12 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
if (revokedLock != null) {
throw new ISE("Lock revoked: [%s]", revokedLock);
}
- final Map<Interval, String> versions = locks
- .stream()
- .collect(Collectors.toMap(TaskLock::getInterval,
TaskLock::getVersion));
+ final Map<Interval, String> versions = locks.stream().collect(
+ Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion)
+ );
- Interval interval;
- String version;
+ final Interval interval;
+ final String version;
if (!materializedBucketIntervals.isEmpty()) {
// If granularity spec has explicit intervals, we just need to find the
version associated to the interval.
// This is because we should have gotten all required locks up front
when the task starts up.
@@ -809,8 +811,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
// We don't have explicit intervals. We can use the segment granularity
to figure out what
// interval we need, but we might not have already locked it.
interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
- version = AbstractBatchIndexTask.findVersion(versions, interval);
- if (version == null) {
+ final String existingLockVersion =
AbstractBatchIndexTask.findVersion(versions, interval);
+ if (existingLockVersion == null) {
if (ingestionSpec.getTuningConfig() instanceof
ParallelIndexTuningConfig) {
final int maxAllowedLockCount = ((ParallelIndexTuningConfig)
ingestionSpec.getTuningConfig())
.getMaxAllowedLockCount();
@@ -830,6 +832,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
throw new ISE(StringUtils.format("Lock for interval [%s] was
revoked.", interval));
}
version = lock.getVersion();
+ } else {
+ version = existingLockVersion;
}
}
return new NonnullPair<>(interval, version);
@@ -856,23 +860,19 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
}
/**
- * Get the version from the locks for a given timestamp. This will work if
the locks were acquired upfront
- * @param timestamp
- * @return The interval andversion if n interval that contains an interval
was found or null otherwise
+ * @return The interval and version containing the given timestamp if one
exists, otherwise null.
*/
@Nullable
- Pair<Interval, String> lookupVersion(DateTime timestamp)
- {
- java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion =
intervalToVersion.entrySet()
-
.stream()
-
.filter(e -> e.getKey()
-
.contains(
-
timestamp))
-
.findFirst();
- if (!intervalAndVersion.isPresent()) {
- return null;
- }
- return new Pair(intervalAndVersion.get().getKey(),
intervalAndVersion.get().getValue());
+ private Pair<Interval, String> lookupVersion(DateTime timestamp)
+ {
+ java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion
+ = intervalToLockVersion.entrySet()
+ .stream()
+ .filter(e -> e.getKey().contains(timestamp))
+ .findFirst();
+ return intervalAndVersion.map(
+ entry -> new Pair<>(entry.getKey(), entry.getValue())
+ ).orElse(null);
}
protected SegmentIdWithShardSpec allocateNewSegmentForTombstone(
@@ -891,4 +891,52 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
);
}
+ @Nullable
+ protected Map<String, Object> getTaskCompletionRowStats()
+ {
+ return null;
+ }
+
+ @Nullable
+ protected Map<String, Object> getTaskCompletionUnparseableEvents()
+ {
+ return null;
+ }
+
+ /**
+ * Builds a singleton map with {@link
IngestionStatsAndErrorsTaskReport#REPORT_KEY}
+ * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as
value.
+ */
+ protected Map<String, TaskReport> buildIngestionStatsReport(
+ IngestionState ingestionState,
+ String errorMessage,
+ Long segmentsRead,
+ Long segmentsPublished
+ )
+ {
+ return TaskReport.buildTaskReports(
+ new IngestionStatsAndErrorsTaskReport(
+ getId(),
+ new IngestionStatsAndErrors(
+ ingestionState,
+ getTaskCompletionUnparseableEvents(),
+ getTaskCompletionRowStats(),
+ errorMessage,
+ segmentAvailabilityConfirmationCompleted,
+ segmentAvailabilityWaitTimeMs,
+ Collections.emptyMap(),
+ segmentsRead,
+ segmentsPublished
+ )
+ )
+ );
+ }
+
+ protected static boolean addBuildSegmentStatsToReport(boolean isFullReport,
IngestionState ingestionState)
+ {
+ return isFullReport
+ || ingestionState == IngestionState.BUILD_SEGMENTS
+ || ingestionState == IngestionState.COMPLETED;
+ }
+
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index e0c7f9bc934..c00f219c777 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -42,8 +42,8 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -614,7 +614,7 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
- new IngestionStatsAndErrorsTaskReportData(
+ new IngestionStatsAndErrors(
ingestionState,
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 019b63520e7..5fd3572f9e2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -43,8 +43,6 @@ import
org.apache.druid.indexer.MetadataStorageUpdaterJobHandler;
import org.apache.druid.indexer.TaskMetricsGetter;
import org.apache.druid.indexer.TaskMetricsUtils;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
@@ -684,25 +682,11 @@ public class HadoopIndexTask extends HadoopTask
implements ChatHandler
private Map<String, TaskReport> getTaskCompletionReports()
{
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- getId(),
- new IngestionStatsAndErrorsTaskReportData(
- ingestionState,
- null,
- getTaskCompletionRowStats(),
- errorMsg,
- segmentAvailabilityConfirmationCompleted,
- segmentAvailabilityWaitTimeMs,
- Collections.emptyMap(),
- null,
- null
- )
- )
- );
+ return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
}
- private Map<String, Object> getTaskCompletionRowStats()
+ @Override
+ protected Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = new HashMap<>();
if (determineConfigStatus != null) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 950515d7348..49041ce4d70 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -48,8 +48,6 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
@@ -335,34 +333,14 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetUnparseableEvents(full)).build();
+ return Response.ok(doGetUnparseableEvents(full != null)).build();
}
- public Map<String, Object> doGetUnparseableEvents(String full)
+ public Map<String, Object> doGetUnparseableEvents(boolean isFullReport)
{
- Map<String, Object> events = new HashMap<>();
+ final Map<String, Object> events = new HashMap<>();
- boolean needsDeterminePartitions = false;
- boolean needsBuildSegments = false;
-
- if (full != null) {
- needsDeterminePartitions = true;
- needsBuildSegments = true;
- } else {
- switch (ingestionState) {
- case DETERMINE_PARTITIONS:
- needsDeterminePartitions = true;
- break;
- case BUILD_SEGMENTS:
- case COMPLETED:
- needsBuildSegments = true;
- break;
- default:
- break;
- }
- }
-
- if (needsDeterminePartitions) {
+ if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) {
events.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -371,7 +349,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
);
}
- if (needsBuildSegments) {
+ if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
events.put(
RowIngestionMeters.BUILD_SEGMENTS,
IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -382,33 +360,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return events;
}
- public Map<String, Object> doGetRowStats(String full)
+ public Map<String, Object> doGetRowStats(boolean isFullReport)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
- boolean needsDeterminePartitions = false;
- boolean needsBuildSegments = false;
-
- if (full != null) {
- needsDeterminePartitions = true;
- needsBuildSegments = true;
- } else {
- switch (ingestionState) {
- case DETERMINE_PARTITIONS:
- needsDeterminePartitions = true;
- break;
- case BUILD_SEGMENTS:
- case COMPLETED:
- needsBuildSegments = true;
- break;
- default:
- break;
- }
- }
-
- if (needsDeterminePartitions) {
+ if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) {
totalsMap.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
determinePartitionsMeters.getTotals()
@@ -419,7 +377,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
);
}
- if (needsBuildSegments) {
+ if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getTotals()
@@ -444,7 +402,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetRowStats(full)).build();
+ return Response.ok(doGetRowStats(full != null)).build();
}
@GET
@@ -463,7 +421,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
payload.put("ingestionState", ingestionState);
payload.put("unparseableEvents", events);
- payload.put("rowStats", doGetRowStats(full));
+ payload.put("rowStats", doGetRowStats(full != null));
ingestionStatsAndErrors.put("taskId", getId());
ingestionStatsAndErrors.put("payload", payload);
@@ -580,36 +538,16 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
}
-
private void updateAndWriteCompletionReports(TaskToolbox toolbox)
{
- completionReports = getTaskCompletionReports();
+ completionReports = buildIngestionStatsReport(ingestionState, errorMsg,
null, null);
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}
- private Map<String, TaskReport> getTaskCompletionReports()
- {
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- getId(),
- new IngestionStatsAndErrorsTaskReportData(
- ingestionState,
- getTaskCompletionUnparseableEvents(),
- getTaskCompletionRowStats(),
- errorMsg,
- segmentAvailabilityConfirmationCompleted,
- segmentAvailabilityWaitTimeMs,
- Collections.emptyMap(),
- null,
- null
- )
- )
- );
- }
-
- private Map<String, Object> getTaskCompletionUnparseableEvents()
+ @Override
+ protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
CircularBuffer<ParseExceptionReport>
determinePartitionsParseExceptionReports =
@@ -631,7 +569,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return unparseableEventsMap;
}
- private Map<String, Object> getTaskCompletionRowStats()
+ @Override
+ protected Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = new HashMap<>();
metrics.put(
@@ -709,6 +648,12 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
}
+ private static boolean addDeterminePartitionStatsToReport(boolean
isFullReport, IngestionState ingestionState)
+ {
+ return isFullReport
+ || ingestionState == IngestionState.DETERMINE_PARTITIONS;
+ }
+
private static LinearPartitionAnalysis createLinearPartitionAnalysis(
GranularitySpec granularitySpec,
@Nonnull DynamicPartitionsSpec partitionsSpec
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index db497dff5ec..4d5b9717ca9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -40,8 +40,8 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -1237,37 +1237,32 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
- *
- * @param taskStatus {@link TaskStatus}
- * @param segmentAvailabilityConfirmed Whether or not the segments were
confirmed to be available for query when
- * when the task completed.
- * @return
*/
- private Map<String, TaskReport> getTaskCompletionReports(TaskStatus
taskStatus, boolean segmentAvailabilityConfirmed)
- {
- Pair<Map<String, Object>, Map<String, Object>>
rowStatsAndUnparseableEvents =
- doGetRowStatsAndUnparseableEvents("true", true);
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- getId(),
- new IngestionStatsAndErrorsTaskReportData(
- IngestionState.COMPLETED,
- rowStatsAndUnparseableEvents.rhs,
- rowStatsAndUnparseableEvents.lhs,
- taskStatus.getErrorMsg(),
- segmentAvailabilityConfirmed,
- segmentAvailabilityWaitTimeMs,
- Collections.emptyMap(),
- segmentsRead,
- segmentsPublished
- )
- )
+ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus
taskStatus)
+ {
+ return buildIngestionStatsReport(
+ IngestionState.COMPLETED,
+ taskStatus.getErrorMsg(),
+ segmentsRead,
+ segmentsPublished
);
}
+ @Override
+ protected Map<String, Object> getTaskCompletionRowStats()
+ {
+ return doGetRowStatsAndUnparseableEvents(true, false).lhs;
+ }
+
+ @Override
+ protected Map<String, Object> getTaskCompletionUnparseableEvents()
+ {
+ return doGetRowStatsAndUnparseableEvents(true, true).rhs;
+ }
+
private void updateAndWriteCompletionReports(TaskStatus status)
{
- completionReports = getTaskCompletionReports(status,
segmentAvailabilityConfirmationCompleted);
+ completionReports = getTaskCompletionReports(status);
writeCompletionReports();
}
@@ -1609,11 +1604,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
for (PushedSegmentsReport pushedSegmentsReport :
completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport =
pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
- LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
+ LOG.warn("Received an empty report from subtask[%s]" +
pushedSegmentsReport.getTaskId());
continue;
}
- RowIngestionMetersTotals rowIngestionMetersTotals =
- getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
+ RowIngestionMetersTotals rowIngestionMetersTotals =
getBuildSegmentsStatsFromTaskReport(
+ taskReport,
+ includeUnparseable ? unparseableEvents : null
+ );
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
@@ -1647,11 +1644,11 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
- LOG.warn("Got an empty task report from subtask: " +
generatedPartitionsReport.getTaskId());
+ LOG.warn("Received an empty report from subtask[%s]",
generatedPartitionsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowStatsForCompletedTask =
- getBuildSegmentsStatsFromTaskReport(taskReport, true,
unparseableEvents);
+ getBuildSegmentsStatsFromTaskReport(taskReport, unparseableEvents);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
@@ -1730,26 +1727,26 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
Map<String, TaskReport> taskReport,
- boolean includeUnparseable,
- List<ParseExceptionReport> unparseableEvents)
+ List<ParseExceptionReport> unparseableEvents
+ )
{
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
+ IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport)
+ taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+ IngestionStatsAndErrors reportData =
ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
+ if (unparseableEvents != null) {
+ unparseableEvents.addAll(
+ (List<ParseExceptionReport>)
+
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS)
+ );
}
return totals;
}
private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEvents(
- String full,
+ boolean isFullReport,
boolean includeUnparseable
)
{
@@ -1779,7 +1776,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
} else {
IndexTask currentSequentialTask = (IndexTask) currentRunner;
- return Pair.of(currentSequentialTask.doGetRowStats(full),
currentSequentialTask.doGetUnparseableEvents(full));
+ return Pair.of(
+ currentSequentialTask.doGetRowStats(isFullReport),
+ currentSequentialTask.doGetUnparseableEvents(isFullReport)
+ );
}
}
@@ -1800,25 +1800,25 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetRowStatsAndUnparseableEvents(full,
false).lhs).build();
+ return Response.ok(doGetRowStatsAndUnparseableEvents(full != null,
false).lhs).build();
}
@VisibleForTesting
- public Map<String, Object> doGetLiveReports(String full)
+ public Map<String, Object> doGetLiveReports(boolean isFullReport)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents
=
- doGetRowStatsAndUnparseableEvents(full, true);
+ doGetRowStatsAndUnparseableEvents(isFullReport, true);
// use the sequential task's ingestion state if we were running that mode
IngestionState ingestionStateForReport;
if (isParallelMode()) {
ingestionStateForReport = ingestionState;
} else {
- IndexTask currentSequentialTask = (IndexTask)
currentSubTaskHolder.getTask();
+ IndexTask currentSequentialTask = currentSubTaskHolder.getTask();
ingestionStateForReport = currentSequentialTask == null
? ingestionState
: currentSequentialTask.getIngestionState();
@@ -1846,7 +1846,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetLiveReports(full)).build();
+ return Response.ok(doGetLiveReports(full != null)).build();
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 46be219d878..bbeb00aa845 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -25,8 +25,6 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -253,25 +251,16 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
*/
private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
{
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- getId(),
- new IngestionStatsAndErrorsTaskReportData(
- IngestionState.COMPLETED,
- getTaskCompletionUnparseableEvents(),
- getTaskCompletionRowStats(),
- "",
- false, // not applicable for parallel subtask
- segmentAvailabilityWaitTimeMs,
- Collections.emptyMap(),
- segmentsRead,
- null
- )
- )
+ return buildIngestionStatsReport(
+ IngestionState.COMPLETED,
+ "",
+ segmentsRead,
+ null
);
}
- private Map<String, Object> getTaskCompletionUnparseableEvents()
+ @Override
+ protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<ParseExceptionReport> parseExceptionMessages =
IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -287,13 +276,12 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
return unparseableEventsMap;
}
- private Map<String, Object> getTaskCompletionRowStats()
+ @Override
+ protected Map<String, Object> getTaskCompletionRowStats()
{
- Map<String, Object> metrics = new HashMap<>();
- metrics.put(
+ return Collections.singletonMap(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getTotals()
);
- return metrics;
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2ce427f7c84..465750bad03 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
@@ -33,8 +32,6 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -499,22 +496,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
Map<String, List<ParseExceptionReport>> events = new HashMap<>();
- boolean needsBuildSegments = false;
-
- if (full != null) {
- needsBuildSegments = true;
- } else {
- switch (ingestionState) {
- case BUILD_SEGMENTS:
- case COMPLETED:
- needsBuildSegments = true;
- break;
- default:
- break;
- }
- }
-
- if (needsBuildSegments) {
+ if (addBuildSegmentStatsToReport(full != null, ingestionState)) {
events.put(
RowIngestionMeters.BUILD_SEGMENTS,
IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -526,28 +508,13 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
return Response.ok(events).build();
}
- private Map<String, Object> doGetRowStats(String full)
+ private Map<String, Object> doGetRowStats(boolean isFullReport)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
- boolean needsBuildSegments = false;
-
- if (full != null) {
- needsBuildSegments = true;
- } else {
- switch (ingestionState) {
- case BUILD_SEGMENTS:
- case COMPLETED:
- needsBuildSegments = true;
- break;
- default:
- break;
- }
- }
-
- if (needsBuildSegments) {
+ if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
@@ -572,11 +539,10 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetRowStats(full)).build();
+ return Response.ok(doGetRowStats(full != null)).build();
}
- @VisibleForTesting
- public Map<String, Object> doGetLiveReports(String full)
+ private Map<String, Object> doGetLiveReports(boolean isFullReport)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
@@ -585,7 +551,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
payload.put("ingestionState", ingestionState);
payload.put("unparseableEvents", events);
- payload.put("rowStats", doGetRowStats(full));
+ payload.put("rowStats", doGetRowStats(isFullReport));
ingestionStatsAndErrors.put("taskId", getId());
ingestionStatsAndErrors.put("payload", payload);
@@ -604,45 +570,28 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ,
getDataSource(), authorizerMapper);
- return Response.ok(doGetLiveReports(full)).build();
+ return Response.ok(doGetLiveReports(full != null)).build();
}
- private Map<String, Object> getTaskCompletionRowStats()
+ @Override
+ protected Map<String, Object> getTaskCompletionRowStats()
{
- Map<String, Object> metrics = new HashMap<>();
- metrics.put(
+ return Collections.singletonMap(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
- return metrics;
}
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
- **
- * @return
*/
private Map<String, TaskReport> getTaskCompletionReports()
{
- return TaskReport.buildTaskReports(
- new IngestionStatsAndErrorsTaskReport(
- getId(),
- new IngestionStatsAndErrorsTaskReportData(
- IngestionState.COMPLETED,
- getTaskCompletionUnparseableEvents(),
- getTaskCompletionRowStats(),
- errorMsg,
- false, // not applicable for parallel subtask
- segmentAvailabilityWaitTimeMs,
- Collections.emptyMap(),
- null,
- null
- )
- )
- );
+ return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null,
null);
}
- private Map<String, Object> getTaskCompletionUnparseableEvents()
+ @Override
+ protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<ParseExceptionReport> parseExceptionMessages =
IndexTaskUtils.getReportListFromSavedParseExceptions(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 09e89b1d601..f3e1e4a06d2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -51,8 +51,8 @@ import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -1123,7 +1123,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
task.getId(),
- new IngestionStatsAndErrorsTaskReportData(
+ new IngestionStatsAndErrors(
ingestionState,
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index ef769d73006..e91d25c0b03 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -52,7 +52,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
@@ -690,7 +690,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
TaskStatus status = statusFuture.get();
Assert.assertTrue(status.getErrorMsg().contains("org.apache.druid.java.util.common.RE:
Max parse exceptions[0] exceeded"));
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
@@ -798,7 +798,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
}
@@ -901,7 +901,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
ParseExceptionReport parseExceptionReport =
@@ -981,7 +981,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertTrue(taskStatus.getErrorMsg().contains("Max parse
exceptions[3] exceeded"));
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
@@ -1257,7 +1257,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
)
);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Pattern errorPattern = Pattern.compile(
@@ -1676,7 +1676,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
}
}
- private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws
IOException
+ private IngestionStatsAndErrors getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
reportsFile,
@@ -1684,7 +1684,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
{
}
);
- return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
+ return IngestionStatsAndErrors.getPayloadFromTaskReports(
taskReports
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 9b68cebe9ee..5c42ea4da4d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -39,7 +39,7 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -248,18 +248,18 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
Assert.assertEquals("Compaction state for " + segment.getId(),
expectedState, segment.getLastCompactionState());
}
- List<IngestionStatsAndErrorsTaskReportData> reports =
getIngestionReports();
+ List<IngestionStatsAndErrors> reports = getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run
by single compaction task
// this test reads 3 segments and publishes 6 segments
Assert.assertEquals(
3,
-
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
+
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum()
);
Assert.assertEquals(
6,
reports.stream()
-
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
+ .mapToLong(IngestionStatsAndErrors::getSegmentsPublished)
.sum()
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 03994e35e51..6f6b3e8ef48 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -557,7 +557,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Granularities.MINUTE,
null
),
- IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false,
true),
+ IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
false,
false
),
@@ -1900,7 +1900,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Granularities.MINUTE,
null
),
- IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false,
true),
+ IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
appendToExisting,
false
),
@@ -1940,7 +1940,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Granularities.MINUTE,
null
),
- IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false,
true),
+ IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
appendToExisting,
false
),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index b049b472d35..2619a23ba33 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -45,7 +45,7 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -77,7 +77,6 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMeters;
-import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -86,7 +85,6 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
@@ -106,17 +104,19 @@ import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@@ -129,6 +129,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
@RunWith(Parameterized.class)
public class IndexTaskTest extends IngestionTestBase
@@ -136,9 +137,6 @@ public class IndexTaskTest extends IngestionTestBase
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
private static final String DATASOURCE = "test";
private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new
TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new
DimensionsSpec(
@@ -173,19 +171,17 @@ public class IndexTaskTest extends IngestionTestBase
private static final IndexSpec INDEX_SPEC = IndexSpec.DEFAULT;
private final ObjectMapper jsonMapper;
private final IndexIO indexIO;
- private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
- private AppenderatorsManager appenderatorsManager;
private SegmentCacheManager segmentCacheManager;
private TestTaskRunner taskRunner;
+ private File tmpDir;
public IndexTaskTest(LockGranularity lockGranularity, boolean
useInputFormatApi)
{
this.jsonMapper = getObjectMapper();
this.indexIO = getIndexIO();
- this.rowIngestionMetersFactory = getRowIngestionMetersFactory();
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
}
@@ -193,9 +189,8 @@ public class IndexTaskTest extends IngestionTestBase
@Before
public void setup() throws IOException
{
- appenderatorsManager = new TestAppenderatorsManager();
-
final File cacheDir = temporaryFolder.newFolder();
+ tmpDir = temporaryFolder.newFolder();
segmentCacheManager = new SegmentLocalCacheManager(
new SegmentLoaderConfig()
{
@@ -213,12 +208,9 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testCorrectInputSourceResources() throws IOException
+ public void testCorrectInputSourceResources()
{
- File tmpDir = temporaryFolder.newFolder();
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
new IndexIngestionSpec(
new DataSchema(
"test-json",
@@ -263,19 +255,13 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testIngestNullOnlyColumns() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,,\n");
writer.write("2014-01-01T01:00:20Z,,\n");
writer.write("2014-01-01T02:00:30Z,,\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
new IndexIngestionSpec(
new DataSchema(
"test-json",
@@ -318,19 +304,13 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void
testIngestNullOnlyColumns_storeEmptyColumnsOff_shouldNotStoreEmptyColumns()
throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,,\n");
writer.write("2014-01-01T01:00:20Z,,\n");
writer.write("2014-01-01T02:00:30Z,,\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
new IndexIngestionSpec(
new DataSchema(
"test-json",
@@ -374,23 +354,14 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testDeterminePartitions() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- null,
null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
@@ -429,11 +400,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testTransformSpec() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,an|array,1|2|3,1\n");
writer.write("2014-01-01T01:00:20Z,b,another|array,3|4,1\n");
writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n");
@@ -472,8 +439,6 @@ public class IndexTaskTest extends IngestionTestBase
final IndexIngestionSpec indexIngestionSpec;
if (useInputFormatApi) {
indexIngestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
DEFAULT_TIMESTAMP_SPEC,
dimensionsSpec,
new CsvInputFormat(columns, listDelimiter, null, false, 0),
@@ -496,12 +461,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
- indexIngestionSpec,
- null
- );
+ IndexTask indexTask = createIndexTask(indexIngestionSpec, null);
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
@@ -568,27 +528,18 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testWithArbitraryGranularity() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new ArbitraryGranularitySpec(
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -597,34 +548,29 @@ public class IndexTaskTest extends IngestionTestBase
);
final List<DataSegment> segments = runSuccessfulTask(indexTask);
-
Assert.assertEquals(1, segments.size());
+
+ invokeApi(req -> indexTask.getLiveReports(req, null));
+ invokeApi(req -> indexTask.getLiveReports(req, "full"));
+ invokeApi(req -> indexTask.getRowStats(req, null));
+ invokeApi(req -> indexTask.getRowStats(req, "full"));
}
@Test
public void testIntervalBucketing() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T07:59:59.977Z,a,1\n");
writer.write("2014-01-01T08:00:00.000Z,b,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.HOUR,
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(50, true),
false,
false
@@ -640,24 +586,16 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testNumShardsProvided() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
null,
- null,
- createTuningConfigWithPartitionsSpec(new
HashedPartitionsSpec(null, 1, null), true),
+ createTuningConfigWithPartitionsSpec(new
HashedPartitionsSpec(null, 1, null)),
false,
false
),
@@ -681,25 +619,17 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testNumShardsAndHashPartitionFunctionProvided() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- null,
null,
createTuningConfigWithPartitionsSpec(
- new HashedPartitionsSpec(null, 1, null,
HashPartitionFunction.MURMUR3_32_ABS), true
+ new HashedPartitionsSpec(null, 1, null,
HashPartitionFunction.MURMUR3_32_ABS)
),
false,
false
@@ -724,24 +654,16 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testNumShardsAndPartitionDimensionsProvided() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- final IndexTask indexTask = new IndexTask(
- null,
- null,
+ final IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- null,
null,
- createTuningConfigWithPartitionsSpec(new
HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true),
+ createTuningConfigWithPartitionsSpec(new
HashedPartitionsSpec(null, 2, ImmutableList.of("dim"))),
false,
false
),
@@ -797,22 +719,14 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void
testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSuccessfullyAppend()
throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- null,
null,
createTuningConfigWithMaxRowsPerSegment(2, false),
true,
@@ -842,27 +756,19 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testIntervalNotSpecified() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -893,49 +799,39 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testIntervalNotSpecifiedWithReplace() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
// Expect exception if reingest with dropExisting and null intervals is
attempted
- expectedException.expect(IAE.class);
- expectedException.expectMessage(
- "GranularitySpec's intervals cannot be empty for replace."
- );
- IndexTask indexTask = new IndexTask(
- null,
- null,
- createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- new UniformGranularitySpec(
- Granularities.HOUR,
- Granularities.MINUTE,
- null
+ Exception exception = Assert.assertThrows(
+ IAE.class,
+ () -> createIndexTask(
+ createDefaultIngestionSpec(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.MINUTE,
+ null
+ ),
+ createTuningConfigWithMaxRowsPerSegment(2, true),
+ false,
+ true
),
- null,
- createTuningConfigWithMaxRowsPerSegment(2, true),
- false,
- true
- ),
- null
+ null
+ )
+ );
+ Assert.assertEquals(
+ "GranularitySpec's intervals cannot be empty for replace.",
+ exception.getMessage()
);
-
}
@Test
public void testCSVFileWithHeader() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
@@ -956,8 +852,6 @@ public class IndexTaskTest extends IngestionTestBase
);
} else {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
@@ -969,9 +863,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -988,11 +880,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testCSVFileWithHeaderColumnOverride() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
@@ -1003,8 +891,6 @@ public class IndexTaskTest extends IngestionTestBase
final IndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
@@ -1027,9 +913,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -1046,10 +930,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testWithSmallMaxTotalRows() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
@@ -1061,19 +942,14 @@ public class IndexTaskTest extends IngestionTestBase
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
- createTuningConfig(2, 2, null, 2L, null, false, true),
+ createTuningConfig(2, 2, 2L, null, false, true),
false,
false
),
@@ -1099,25 +975,17 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testPerfectRollup() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- populateRollupTestData(tmpFile);
+ populateRollupTestData(createTempFile());
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
- null,
- createTuningConfig(3, 2, null, 2L, null, true, true),
+ createTuningConfig(3, 2, 2L, null, true, true),
false,
false
),
@@ -1134,7 +1002,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(DATASOURCE, segment.getDataSource());
Assert.assertEquals(expectedInterval, segment.getInterval());
-
Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
+ Assert.assertEquals(segment.getShardSpec().getClass(),
HashBasedNumberedShardSpec.class);
Assert.assertEquals(i, segment.getShardSpec().getPartitionNum());
}
}
@@ -1142,25 +1010,17 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testBestEffortRollup() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- populateRollupTestData(tmpFile);
+ populateRollupTestData(createTempFile());
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
- null,
- createTuningConfig(3, 2, null, 2L, null, false, true),
+ createTuningConfig(3, 2, 2L, null, false, true),
false,
false
),
@@ -1183,24 +1043,17 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityNoSegments() throws IOException
+ public void testWaitForSegmentAvailabilityNoSegments()
{
- final File tmpDir = temporaryFolder.newFolder();
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
List<DataSegment> segmentsToWaitFor = new ArrayList<>();
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -1214,25 +1067,18 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityInvalidWaitTimeout() throws
IOException
+ public void testWaitForSegmentAvailabilityInvalidWaitTimeout()
{
- final File tmpDir = temporaryFolder.newFolder();
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
List<DataSegment> segmentsToWaitFor = new ArrayList<>();
segmentsToWaitFor.add(EasyMock.createMock(DataSegment.class));
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -1246,10 +1092,8 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws
IOException
+ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout()
{
- final File tmpDir = temporaryFolder.newFolder();
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
SegmentHandoffNotifierFactory mockFactory =
EasyMock.createMock(SegmentHandoffNotifierFactory.class);
SegmentHandoffNotifier mockNotifier =
EasyMock.createMock(SegmentHandoffNotifier.class);
@@ -1260,18 +1104,13 @@ public class IndexTaskTest extends IngestionTestBase
segmentsToWaitFor.add(mockDataSegment1);
segmentsToWaitFor.add(mockDataSegment2);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -1309,10 +1148,8 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws
IOException
+ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess()
{
- final File tmpDir = temporaryFolder.newFolder();
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
@@ -1321,18 +1158,13 @@ public class IndexTaskTest extends IngestionTestBase
segmentsToWaitFor.add(mockDataSegment1);
segmentsToWaitFor.add(mockDataSegment2);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -1365,10 +1197,8 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws
IOException
+ public void testWaitForSegmentAvailabilityEmitsExpectedMetric()
{
- final File tmpDir = temporaryFolder.newFolder();
-
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
@@ -1377,18 +1207,13 @@ public class IndexTaskTest extends IngestionTestBase
segmentsToWaitFor.add(mockDataSegment1);
segmentsToWaitFor.add(mockDataSegment2);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
- null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
@@ -1438,14 +1263,15 @@ public class IndexTaskTest extends IngestionTestBase
}
}
+ private File createTempFile() throws IOException
+ {
+ return File.createTempFile("druid", "index", tmpDir);
+ }
+
@Test
public void testIgnoreParseException() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
@@ -1454,15 +1280,13 @@ public class IndexTaskTest extends IngestionTestBase
final TimestampSpec timestampSpec = new TimestampSpec("time", "auto",
null);
final List<String> columns = Arrays.asList("time", "dim", "val");
// ignore parse exception
- final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, null, false, false);
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, false, false);
// GranularitySpec.intervals and numShards must be null to verify
reportParseException=false is respected both in
// IndexTask.determineShardSpecs() and
IndexTask.generateAndPublishSegments()
final IndexIngestionSpec parseExceptionIgnoreSpec;
if (useInputFormatApi) {
parseExceptionIgnoreSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
@@ -1485,12 +1309,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
- parseExceptionIgnoreSpec,
- null
- );
+ IndexTask indexTask = createIndexTask(parseExceptionIgnoreSpec, null);
final List<DataSegment> segments = runSuccessfulTask(indexTask);
@@ -1502,10 +1321,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testReportParseException() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+ final File tmpFile = createTempFile();
try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
@@ -1515,13 +1331,11 @@ public class IndexTaskTest extends IngestionTestBase
final TimestampSpec timestampSpec = new TimestampSpec("time", "auto",
null);
final List<String> columns = Arrays.asList("time", "dim", "val");
// report parse exception
- final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, null, false, true);
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, false, true);
final IndexIngestionSpec indexIngestionSpec;
List<String> expectedMessages;
if (useInputFormatApi) {
indexIngestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
@@ -1550,18 +1364,13 @@ public class IndexTaskTest extends IngestionTestBase
tmpFile.toURI()
)
);
- IndexTask indexTask = new IndexTask(
- null,
- null,
- indexIngestionSpec,
- null
- );
+ IndexTask indexTask = createIndexTask(indexIngestionSpec, null);
TaskStatus status = runTask(indexTask).lhs;
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
@@ -1574,10 +1383,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testMultipleParseExceptionsSuccess() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+ final File tmpFile = createTempFile();
try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
writer.write("{\"time\":\"unparseable\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n");
// unparseable time
writer.write("{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n");
// valid row
@@ -1633,8 +1439,6 @@ public class IndexTaskTest extends IngestionTestBase
final IndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
dimensionsSpec,
new JsonInputFormat(null, null, null, null, null),
@@ -1657,18 +1461,13 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
- ingestionSpec,
- null
- );
+ IndexTask indexTask = createIndexTask(ingestionSpec, null);
TaskStatus status = runTask(indexTask).lhs;
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -1759,10 +1558,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testMultipleParseExceptionsFailure() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+ final File tmpFile = createTempFile();
try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
writer.write("time,dim,dimLong,dimFloat,val\n");
writer.write("unparseable,a,2,3.0,1\n"); // unparseable
@@ -1815,8 +1611,6 @@ public class IndexTaskTest extends IngestionTestBase
List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
@@ -1853,9 +1647,7 @@ public class IndexTaskTest extends IngestionTestBase
tmpFile.toURI()
)
);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -1864,7 +1656,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -1902,10 +1694,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws
Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+ final File tmpFile = createTempFile();
try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
writer.write("time,dim,dimLong,dimFloat,val\n");
writer.write("unparseable,a,2,3.0,1\n"); // unparseable
@@ -1958,8 +1747,6 @@ public class IndexTaskTest extends IngestionTestBase
List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
@@ -1987,9 +1774,7 @@ public class IndexTaskTest extends IngestionTestBase
StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0,
dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)",
tmpFile.toURI()),
StringUtils.format("Timestamp[unparseable] is unparseable! Event:
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1,
Line: 2)", tmpFile.toURI())
);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -1998,7 +1783,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -2036,36 +1821,26 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testCsvWithHeaderOfEmptyColumns() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("ts,,\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
- tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("ts,dim,\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
- tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("ts,,val\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
// report parse exception
- final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null,
null, null, true, true);
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null,
null, true, true);
final IndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
@@ -2088,9 +1863,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -2123,10 +1896,7 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testCsvWithHeaderOfEmptyTimestamp() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
-
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+ final File tmpFile = createTempFile();
try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
writer.write(",,\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
@@ -2134,13 +1904,11 @@ public class IndexTaskTest extends IngestionTestBase
final List<String> columns = Arrays.asList("ts", "", "");
// report parse exception
- final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, null, false, true);
+ final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, false, true);
final IndexIngestionSpec ingestionSpec;
List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
- jsonMapper,
- tmpDir,
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
@@ -2169,9 +1937,7 @@ public class IndexTaskTest extends IngestionTestBase
tmpFile.toURI()
)
);
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
ingestionSpec,
null
);
@@ -2181,7 +1947,7 @@ public class IndexTaskTest extends IngestionTestBase
checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
- IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+ IngestionStatsAndErrors reportData = getTaskReportData();
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
@@ -2196,26 +1962,18 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testOverwriteWithSameSegmentGranularity() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- populateRollupTestData(tmpFile);
+ populateRollupTestData(createTempFile());
for (int i = 0; i < 2; i++) {
- final IndexTask indexTask = new IndexTask(
- null,
- null,
+ final IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
- null,
- createTuningConfig(3, 2, null, 2L, null, false, true),
+ createTuningConfig(3, 2, 2L, null, false, true),
false,
false
),
@@ -2260,27 +2018,19 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void testOverwriteWithDifferentSegmentGranularity() throws Exception
{
- final File tmpDir = temporaryFolder.newFolder();
- final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- populateRollupTestData(tmpFile);
+ populateRollupTestData(createTempFile());
for (int i = 0; i < 2; i++) {
final Granularity segmentGranularity = i == 0 ? Granularities.DAY :
Granularities.MONTH;
- final IndexTask indexTask = new IndexTask(
- null,
- null,
+ final IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
segmentGranularity,
Granularities.DAY,
true,
null
),
- null,
- createTuningConfig(3, 2, null, 2L, null, false, true),
+ createTuningConfig(3, 2, 2L, null, false, true),
false,
false
),
@@ -2305,55 +2055,43 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void testIndexTaskWithSingleDimPartitionsSpecThrowingException()
throws Exception
+ public void testIndexTaskWithSingleDimPartitionsSpecThrowingException()
{
- final File tmpDir = temporaryFolder.newFolder();
- final IndexTask task = new IndexTask(
- null,
- null,
+ final IndexTask task = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
- null,
null,
- createTuningConfigWithPartitionsSpec(new
SingleDimensionPartitionsSpec(null, 1, null, false), true),
+ createTuningConfigWithPartitionsSpec(new
SingleDimensionPartitionsSpec(null, 1, null, false)),
false,
false
),
null
);
- expectedException.expect(UnsupportedOperationException.class);
- expectedException.expectMessage(
-
"partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec]
is not supported"
+ Exception exception = Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> task.isReady(createActionClient(task))
+ );
+ Assert.assertEquals(
+
"partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec]
is not supported",
+ exception.getMessage()
);
- task.isReady(createActionClient(task));
}
@Test
public void testOldSegmentNotReplacedWhenDropFlagFalse() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.YEAR,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2365,24 +2103,19 @@ public class IndexTaskTest extends IngestionTestBase
List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
- Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
for (DataSegment segment : usedSegmentsBeforeOverwrite) {
Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval()));
}
- indexTask = new IndexTask(
- null,
- null,
+ indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2394,7 +2127,7 @@ public class IndexTaskTest extends IngestionTestBase
segments = runSuccessfulTask(indexTask);
Assert.assertEquals(3, segments.size());
- Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size());
int yearSegmentFound = 0;
int minuteSegmentFound = 0;
@@ -2414,30 +2147,22 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void
testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment()
throws Exception
+ public void
testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment()
+ throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T01:00:10Z,a,1\n");
writer.write("2014-01-01T01:10:20Z,b,1\n");
writer.write("2014-01-01T01:20:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2449,24 +2174,19 @@ public class IndexTaskTest extends IngestionTestBase
List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
- Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
for (DataSegment segment : usedSegmentsBeforeOverwrite) {
Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
}
- indexTask = new IndexTask(
- null,
- null,
+ indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01T01:10:00Z/2014-01-01T02:00:00Z"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
true
@@ -2478,7 +2198,7 @@ public class IndexTaskTest extends IngestionTestBase
segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
- Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
Assert.assertEquals(2, usedSegmentsBeforeAfterOverwrite.size());
int segmentFound = 0;
int tombstonesFound = 0;
@@ -2506,30 +2226,22 @@ public class IndexTaskTest extends IngestionTestBase
}
@Test
- public void
testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment()
throws Exception
+ public void
testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment()
+ throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2541,24 +2253,19 @@ public class IndexTaskTest extends IngestionTestBase
List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
- Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
for (DataSegment segment : usedSegmentsBeforeOverwrite) {
Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
}
- indexTask = new IndexTask(
- null,
- null,
+ indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
true
@@ -2570,7 +2277,7 @@ public class IndexTaskTest extends IngestionTestBase
segments = runSuccessfulTask(indexTask);
Assert.assertEquals(24, segments.size());
- Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
Assert.assertEquals(24, usedSegmentsBeforeAfterOverwrite.size());
for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) {
// Used segments after overwrite and drop will contain only the
@@ -2586,28 +2293,19 @@ public class IndexTaskTest extends IngestionTestBase
@Test
public void verifyPublishingOnlyTombstones() throws Exception
{
- File tmpDir = temporaryFolder.newFolder();
-
- File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-03-01T00:00:10Z,a,1\n");
writer.write("2014-03-01T01:00:20Z,b,1\n");
writer.write("2014-03-01T02:00:30Z,c,1\n");
}
- IndexTask indexTask = new IndexTask(
- null,
- null,
+ IndexTask indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-03/2014-04-01"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2619,7 +2317,7 @@ public class IndexTaskTest extends IngestionTestBase
List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
- Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
+ Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
for (DataSegment segment : usedSegmentsBeforeOverwrite) {
Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
@@ -2628,25 +2326,19 @@ public class IndexTaskTest extends IngestionTestBase
// create new data but with an ingestion interval appropriate to filter it
all out so that only tombstones
// are created:
tmpDir = temporaryFolder.newFolder();
- tmpFile = File.createTempFile("druid", "index", tmpDir);
- try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ try (BufferedWriter writer = Files.newWriter(createTempFile(),
StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-12-01T02:00:30Z,c,1\n");
}
- indexTask = new IndexTask(
- null,
- null,
+ indexTask = createIndexTask(
createDefaultIngestionSpec(
- jsonMapper,
- tmpDir,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-03-01/2014-04-01")) // filter out
all data
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
true
@@ -2661,37 +2353,35 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertTrue(segments.get(0).isTombstone());
}
-
+
@Test
- public void testErrorWhenDropFlagTrueAndOverwriteFalse() throws Exception
+ public void testErrorWhenDropFlagTrueAndOverwriteFalse()
{
- expectedException.expect(IAE.class);
- expectedException.expectMessage(
- "Cannot simultaneously replace and append to existing segments. Either
dropExisting or appendToExisting should be set to false"
- );
- new IndexTask(
- null,
- null,
- createDefaultIngestionSpec(
- jsonMapper,
- temporaryFolder.newFolder(),
- new UniformGranularitySpec(
- Granularities.MINUTE,
- Granularities.MINUTE,
-
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+ Exception exception = Assert.assertThrows(
+ IAE.class,
+ () -> createIndexTask(
+ createDefaultIngestionSpec(
+ new UniformGranularitySpec(
+ Granularities.MINUTE,
+ Granularities.MINUTE,
+
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+ ),
+ createTuningConfigWithMaxRowsPerSegment(10, true),
+ true,
+ true
),
- null,
- createTuningConfigWithMaxRowsPerSegment(10, true),
- true,
- true
- ),
- null
+ null
+ )
+ );
+ Assert.assertEquals(
+ "Cannot simultaneously replace and append to existing segments."
+ + " Either dropExisting or appendToExisting should be set to false",
+ exception.getMessage()
);
}
- // If isStandaloneTask is false, cleanup should be a no-op
@Test
- public void testCleanupIndexTask() throws Exception
+ public void testCleanupIsNoopIfNotStandaloneTask() throws Exception
{
new IndexTask(
null,
@@ -2700,14 +2390,11 @@ public class IndexTaskTest extends IngestionTestBase
"dataSource",
null,
createDefaultIngestionSpec(
- jsonMapper,
- temporaryFolder.newFolder(),
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2718,11 +2405,8 @@ public class IndexTaskTest extends IngestionTestBase
).cleanUp(null, null);
}
- /* if shouldCleanup is true, we should fall back to AbstractTask.cleanup,
- * check isEncapsulatedTask=false, and then exit.
- */
@Test
- public void testCleanup() throws Exception
+ public void testCleanupIsDoneIfStandaloneTask() throws Exception
{
TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
@@ -2736,14 +2420,11 @@ public class IndexTaskTest extends IngestionTestBase
"dataSource",
null,
createDefaultIngestionSpec(
- jsonMapper,
- temporaryFolder.newFolder(),
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- null,
createTuningConfigWithMaxRowsPerSegment(10, true),
false,
false
@@ -2758,7 +2439,7 @@ public class IndexTaskTest extends IngestionTestBase
public static void
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status)
{
// full stacktrace will be too long and make tests brittle (e.g. if line #
changes), just match the main message
- Assert.assertThat(
+ MatcherAssert.assertThat(
status.getErrorMsg(),
CoreMatchers.containsString("Max parse exceptions")
);
@@ -2789,24 +2470,21 @@ public class IndexTaskTest extends IngestionTestBase
1,
null,
null,
- null,
forceGuaranteedRollup,
true
);
}
private static IndexTuningConfig createTuningConfigWithPartitionsSpec(
- PartitionsSpec partitionsSpec,
- boolean forceGuaranteedRollup
+ PartitionsSpec partitionsSpec
)
{
return createTuningConfig(
null,
1,
null,
- null,
partitionsSpec,
- forceGuaranteedRollup,
+ true,
true
);
}
@@ -2814,7 +2492,6 @@ public class IndexTaskTest extends IngestionTestBase
static IndexTuningConfig createTuningConfig(
@Nullable Integer maxRowsPerSegment,
@Nullable Integer maxRowsInMemory,
- @Nullable Long maxBytesInMemory,
@Nullable Long maxTotalRows,
@Nullable PartitionsSpec partitionsSpec,
boolean forceGuaranteedRollup,
@@ -2826,7 +2503,7 @@ public class IndexTaskTest extends IngestionTestBase
maxRowsPerSegment,
null,
maxRowsInMemory,
- maxBytesInMemory,
+ null,
null,
maxTotalRows,
null,
@@ -2850,7 +2527,26 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws
IOException
+ @SuppressWarnings("unchecked")
+ private <T> T invokeApi(Function<HttpServletRequest, Response> api)
+ {
+ final HttpServletRequest request = EasyMock.mock(HttpServletRequest.class);
+ EasyMock.expect(request.getAttribute(EasyMock.anyString()))
+ .andReturn("allow-all");
+ EasyMock.replay(request);
+ return (T) api.apply(request).getEntity();
+ }
+
+ private Set<DataSegment> getAllUsedSegments()
+ {
+ return Sets.newHashSet(
+ getSegmentsMetadataManager()
+
.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true)
+ .get()
+ );
+ }
+
+ private IngestionStatsAndErrors getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = jsonMapper.readValue(
taskRunner.getTaskReportsFile(),
@@ -2858,16 +2554,19 @@ public class IndexTaskTest extends IngestionTestBase
{
}
);
- return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
- taskReports
- );
+ return IngestionStatsAndErrors.getPayloadFromTaskReports(taskReports);
+ }
+
+ private IndexTask createIndexTask(
+ IndexIngestionSpec ingestionSchema,
+ Map<String, Object> context
+ )
+ {
+ return new IndexTask(null, null, ingestionSchema, context);
}
private IndexIngestionSpec createDefaultIngestionSpec(
- ObjectMapper objectMapper,
- File baseDir,
@Nullable GranularitySpec granularitySpec,
- @Nullable TransformSpec transformSpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting,
Boolean dropExisting
@@ -2875,12 +2574,10 @@ public class IndexTaskTest extends IngestionTestBase
{
if (useInputFormatApi) {
return createIngestionSpec(
- objectMapper,
- baseDir,
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
DEFAULT_INPUT_FORMAT,
- transformSpec,
+ null,
granularitySpec,
tuningConfig,
appendToExisting,
@@ -2888,10 +2585,10 @@ public class IndexTaskTest extends IngestionTestBase
);
} else {
return createIngestionSpec(
- objectMapper,
- baseDir,
+ jsonMapper,
+ tmpDir,
DEFAULT_PARSE_SPEC,
- transformSpec,
+ null,
granularitySpec,
tuningConfig,
appendToExisting,
@@ -2926,9 +2623,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
- static IndexIngestionSpec createIngestionSpec(
- ObjectMapper objectMapper,
- File baseDir,
+ private IndexIngestionSpec createIngestionSpec(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
InputFormat inputFormat,
@@ -2940,8 +2635,8 @@ public class IndexTaskTest extends IngestionTestBase
)
{
return createIngestionSpec(
- objectMapper,
- baseDir,
+ jsonMapper,
+ tmpDir,
null,
timestampSpec,
dimensionsSpec,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
index bc7148c5401..0c95773da92 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
@@ -19,7 +19,7 @@
package org.apache.druid.indexing.common.task;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -41,7 +41,7 @@ public class ParseExceptionReport
@SuppressWarnings("unchecked")
public static ParseExceptionReport forPhase(
- IngestionStatsAndErrorsTaskReportData reportData,
+ IngestionStatsAndErrors reportData,
String phase
)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index 53b8ace317a..aece6edb3f4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -26,8 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils;
@@ -59,7 +59,7 @@ public class TaskReportSerdeTest
{
IngestionStatsAndErrorsTaskReport report1 = new
IngestionStatsAndErrorsTaskReport(
"testID",
- new IngestionStatsAndErrorsTaskReportData(
+ new IngestionStatsAndErrors(
IngestionState.BUILD_SEGMENTS,
ImmutableMap.of(
"hello", "world"
@@ -118,7 +118,7 @@ public class TaskReportSerdeTest
IngestionStatsAndErrorsTaskReport expected = new
IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
- new IngestionStatsAndErrorsTaskReportData(
+ new IngestionStatsAndErrors(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 6b662e473b0..08b0c584f76 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -50,8 +50,8 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -558,7 +558,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
if (!task.isPresent()) {
return null;
}
- return Futures.immediateFuture(((ParallelIndexSupervisorTask)
task.get()).doGetLiveReports("full"));
+ return Futures.immediateFuture(((ParallelIndexSupervisorTask)
task.get()).doGetLiveReports(true));
}
public TaskContainer getTaskContainer(String taskId)
@@ -1074,12 +1074,12 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
});
}
- public List<IngestionStatsAndErrorsTaskReportData> getIngestionReports()
throws IOException
+ public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
{
return getReports().entrySet()
.stream()
.filter(entry ->
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
- .map(entry -> (IngestionStatsAndErrorsTaskReportData)
entry.getValue().getPayload())
+ .map(entry -> (IngestionStatsAndErrors)
entry.getValue().getPayload())
.collect(Collectors.toList());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 0da7ed0934e..7ad9f3c9464 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -446,7 +446,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
false,
Collections.emptyList()
);
- Map<String, Object> actualReports = task.doGetLiveReports("full");
+ Map<String, Object> actualReports = task.doGetLiveReports(true);
final long processedBytes = useInputFormatApi ? 335 : 0;
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(),
@@ -497,7 +497,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
TaskContainer taskContainer =
getIndexingServiceClient().getTaskContainer(task.getId());
final ParallelIndexSupervisorTask executedTask =
(ParallelIndexSupervisorTask) taskContainer.getTask();
- Map<String, Object> actualReports = executedTask.doGetLiveReports("full");
+ Map<String, Object> actualReports = executedTask.doGetLiveReports(true);
final long processedBytes = useInputFormatApi ? 335 : 0;
RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10,
processedBytes, 1, 1, 1);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 2db5da143ed..fb454acafec 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -52,7 +52,7 @@ import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
@@ -459,7 +459,7 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
return new SegmentDescriptorAndExpectedDim1Values(interval, partitionNum,
expectedDim1Values);
}
- protected IngestionStatsAndErrorsTaskReportData getTaskReportData() throws
IOException
+ protected IngestionStatsAndErrors getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
reportsFile,
@@ -467,7 +467,7 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
{
}
);
- return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
+ return IngestionStatsAndErrors.getPayloadFromTaskReports(
taskReports
);
}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index fb02d8f8dad..8eadbdf8f11 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskReport;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -487,7 +487,7 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
if (segmentAvailabilityConfirmationPair.lhs != null &&
segmentAvailabilityConfirmationPair.lhs) {
TaskReport reportRaw =
indexer.getTaskReport(taskID).get("ingestionStatsAndErrors");
IngestionStatsAndErrorsTaskReport report =
(IngestionStatsAndErrorsTaskReport) reportRaw;
- IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) report.getPayload();
+ IngestionStatsAndErrors reportData = (IngestionStatsAndErrors)
report.getPayload();
// Confirm that the task waited longer than 0ms for the task to complete.
Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0);
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index ac4ef536b02..ceaeddea0dc 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -27,7 +27,6 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.ISE;
@@ -263,13 +262,15 @@ public class OverlordResourceTestClient
public String getTaskErrorMessage(String taskId)
{
- return ((IngestionStatsAndErrorsTaskReportData)
getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getErrorMsg();
+ return
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+ .getPayload().getErrorMsg();
}
public RowIngestionMetersTotals getTaskStats(String taskId)
{
try {
- Object buildSegment = ((IngestionStatsAndErrorsTaskReportData)
getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getRowStats().get("buildSegments");
+ Object buildSegment =
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+
.getPayload().getRowStats().get("buildSegments");
return jsonMapper.convertValue(buildSegment,
RowIngestionMetersTotals.class);
}
catch (Exception e) {
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 0f160580db5..d419bbb48d3 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskReport;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -370,7 +370,7 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
if (segmentAvailabilityConfirmationPair.lhs != null &&
segmentAvailabilityConfirmationPair.lhs) {
TaskReport reportRaw =
indexer.getTaskReport(taskID).get("ingestionStatsAndErrors");
IngestionStatsAndErrorsTaskReport report =
(IngestionStatsAndErrorsTaskReport) reportRaw;
- IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) report.getPayload();
+ IngestionStatsAndErrors reportData = (IngestionStatsAndErrors)
report.getPayload();
// Confirm that the task waited longer than 0ms for the task to complete.
Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 296f7fe747a..2b37c7a27d2 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -21,8 +21,8 @@ package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -176,13 +176,13 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
Assert.assertEquals(2,
reports.values()
.stream()
- .mapToLong(r ->
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished())
+ .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsPublished())
.sum()
);
Assert.assertEquals(4,
reports.values()
.stream()
- .mapToLong(r ->
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead())
+ .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsRead())
.sum()
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]