This is an automated email from the ASF dual-hosted git repository. kfaraz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 4df4896674f Refactor: Add common method in AbstractBatchIndexTask to create ingestion stats report (#16202) 4df4896674f is described below commit 4df4896674f57ef49e2736aa613f271112a2a111 Author: Kashif Faraz <kashif.fa...@gmail.com> AuthorDate: Thu Mar 28 23:07:00 2024 +0530 Refactor: Add common method in AbstractBatchIndexTask to create ingestion stats report (#16202) Changes - No functional changes - Add method `AbstractBatchIndexTask.buildIngestionStatsReport()` used in several batch tasks - Add utility method `AbstractBatchIndexTask.addBuildSegmentStatsToReport()` - Use boolean argument to represent a full report instead of the String `full` in internal methods. (REST API remains unchanged.) - Rename `IngestionStatsAndErrorsTaskReportData` to `IngestionStatsAndErrors` - Clean up some of the methods --- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 12 +- .../indexing/kinesis/KinesisIndexTaskTest.java | 6 +- ...eportData.java => IngestionStatsAndErrors.java} | 49 +- .../common/IngestionStatsAndErrorsTaskReport.java | 6 +- .../common/task/AbstractBatchIndexTask.java | 128 ++-- .../task/AppenderatorDriverRealtimeIndexTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 22 +- .../druid/indexing/common/task/IndexTask.java | 97 +-- .../parallel/ParallelIndexSupervisorTask.java | 96 +-- .../batch/parallel/PartialSegmentGenerateTask.java | 32 +- .../task/batch/parallel/SinglePhaseSubTask.java | 77 +-- .../SeekableStreamIndexTaskRunner.java | 4 +- .../AppenderatorDriverRealtimeIndexTaskTest.java | 16 +- .../common/task/CompactionTaskParallelRunTest.java | 8 +- .../common/task/CompactionTaskRunTest.java | 6 +- .../druid/indexing/common/task/IndexTaskTest.java | 735 ++++++--------------- .../indexing/common/task/ParseExceptionReport.java | 4 +- .../indexing/common/task/TaskReportSerdeTest.java | 6 +- .../AbstractParallelIndexSupervisorTaskTest.java | 8 +- .../parallel/SinglePhaseParallelIndexingTest.java | 4 +- .../SeekableStreamIndexTaskTestBase.java | 6 +- .../testsEx/indexer/AbstractITBatchIndexTest.java | 4 +- .../clients/OverlordResourceTestClient.java | 7 +- .../tests/indexer/AbstractITBatchIndexTest.java | 4 +- .../druid/tests/indexer/ITCompactionTaskTest.java | 6 +- 25 files changed, 470 insertions(+), 877 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index c3d1d213383..85a9fdb6254 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -54,7 +54,7 @@ import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.IndexTaskTest; @@ -1615,7 +1615,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase newDataSchemaMetadata() ); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, @@ -1695,7 +1695,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); Assert.assertNull(newDataSchemaMetadata()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, @@ -3061,7 +3061,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase ); // Verify unparseable data - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -3233,7 +3233,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase TaskStatus status = future.get(); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); } @@ -3281,7 +3281,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase TaskStatus status = future.get(); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index e14b6679b09..ad8964d712c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -43,7 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -1182,7 +1182,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase newDataSchemaMetadata() ); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, @@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); Assert.assertNull(newDataSchemaMetadata()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java similarity index 83% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java index 97ea58e1c5c..d55f5ed0d19 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java @@ -27,36 +27,19 @@ import javax.annotation.Nullable; import java.util.Map; import java.util.Objects; -public class IngestionStatsAndErrorsTaskReportData +public class IngestionStatsAndErrors { - @JsonProperty - private IngestionState ingestionState; - - @JsonProperty - private Map<String, Object> unparseableEvents; - - @JsonProperty - private Map<String, Object> rowStats; - - @JsonProperty - @Nullable - private String errorMsg; - - @JsonProperty - private boolean segmentAvailabilityConfirmed; - - @JsonProperty - private long segmentAvailabilityWaitTimeMs; - - @JsonProperty - private Map<String, Long> recordsProcessed; - - @JsonProperty - private Long segmentsRead; - @JsonProperty - private Long segmentsPublished; - - public IngestionStatsAndErrorsTaskReportData( + private final IngestionState ingestionState; + private final Map<String, Object> unparseableEvents; + private final Map<String, Object> rowStats; + private final String errorMsg; + private final boolean segmentAvailabilityConfirmed; + private final long segmentAvailabilityWaitTimeMs; + private final Map<String, Long> recordsProcessed; + private final Long segmentsRead; + private final Long segmentsPublished; + + public IngestionStatsAndErrors( @JsonProperty("ingestionState") IngestionState ingestionState, @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents, @JsonProperty("rowStats") Map<String, Object> rowStats, @@ -139,12 +122,12 @@ public class IngestionStatsAndErrorsTaskReportData return segmentsPublished; } - public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( + public static IngestionStatsAndErrors getPayloadFromTaskReports( Map<String, TaskReport> taskReports ) { - return (IngestionStatsAndErrorsTaskReportData) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) - .getPayload(); + return (IngestionStatsAndErrors) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + .getPayload(); } @Override @@ -156,7 +139,7 @@ public class IngestionStatsAndErrorsTaskReportData if (o == null || getClass() != o.getClass()) { return false; } - IngestionStatsAndErrorsTaskReportData that = (IngestionStatsAndErrorsTaskReportData) o; + IngestionStatsAndErrors that = (IngestionStatsAndErrors) o; return getIngestionState() == that.getIngestionState() && Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) && Objects.equals(getRowStats(), that.getRowStats()) && diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 7518523b1de..0122a8d3ffb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -32,12 +32,12 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport private final String taskId; @JsonProperty - private final IngestionStatsAndErrorsTaskReportData payload; + private final IngestionStatsAndErrors payload; @JsonCreator public IngestionStatsAndErrorsTaskReport( @JsonProperty("taskId") String taskId, - @JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload + @JsonProperty("payload") IngestionStatsAndErrors payload ) { this.taskId = taskId; @@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport } @Override - public IngestionStatsAndErrorsTaskReportData getPayload() + public IngestionStatsAndErrors getPayload() { return payload; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index b1d3e992696..a5acadf704e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -30,9 +30,13 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.IngestionState; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -53,6 +57,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; @@ -110,8 +115,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask { private static final Logger log = new Logger(AbstractBatchIndexTask.class); - protected boolean segmentAvailabilityConfirmationCompleted = false; - protected long segmentAvailabilityWaitTimeMs = 0L; + private boolean segmentAvailabilityConfirmationCompleted = false; + private long segmentAvailabilityWaitTimeMs = 0L; @GuardedBy("this") private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); @@ -123,8 +128,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private final int maxAllowedLockCount; - // Store lock versions - Map<Interval, String> intervalToVersion = new HashMap<>(); + private final Map<Interval, String> intervalToLockVersion = new HashMap<>(); protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode) { @@ -481,7 +485,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur)); } locksAcquired++; - intervalToVersion.put(cur, lock.getVersion()); + intervalToLockVersion.put(cur, lock.getVersion()); } return true; } @@ -685,10 +689,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available * for querying. * - * @param toolbox {@link TaskToolbox} object with for assisting with task work. - * @param segmentsToWaitFor {@link List} of segments to wait for availability. - * @param waitTimeout Millis to wait before giving up - * @return True if all segments became available, otherwise False. + * @return True if all segments became available before the {@code waitTimeoutMillis} + * elapsed, otherwise false. */ protected boolean waitForSegmentAvailability( TaskToolbox toolbox, @@ -697,22 +699,22 @@ public abstract class AbstractBatchIndexTask extends AbstractTask ) { if (segmentsToWaitFor.isEmpty()) { - log.info("Asked to wait for segments to be available, but I wasn't provided with any segments."); + log.info("No segments to wait for availability."); return true; } else if (waitTimeout < 0) { - log.warn("Asked to wait for availability for < 0 seconds?! Requested waitTimeout: [%s]", waitTimeout); + log.warn("Not waiting for segment availability as waitTimeout[%s] is less than zero.", waitTimeout); return false; } log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size()); - final long start = System.nanoTime(); + final Stopwatch stopwatch = Stopwatch.createStarted(); try ( SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory() .createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource()) ) { - ExecutorService exec = Execs.directExecutor(); - CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size()); + final ExecutorService exec = Execs.directExecutor(); + final CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size()); notifier.start(); for (DataSegment s : segmentsToWaitFor) { @@ -720,11 +722,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()), exec, () -> { + doneSignal.countDown(); log.debug( - "Confirmed availability for [%s]. Removing from list of segments to wait for", - s.getId() + "Segment[%s] is now available, [%d] segments remaining.", + s.getId(), doneSignal.getCount() ); - doneSignal.countDown(); } ); } @@ -737,7 +739,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask return false; } finally { - segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + segmentAvailabilityWaitTimeMs = stopwatch.millisElapsed(); toolbox.getEmitter().emit( new ServiceMetricEvent.Builder() .setDimension("dataSource", getDataSource()) @@ -782,12 +784,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask if (revokedLock != null) { throw new ISE("Lock revoked: [%s]", revokedLock); } - final Map<Interval, String> versions = locks - .stream() - .collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion)); + final Map<Interval, String> versions = locks.stream().collect( + Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion) + ); - Interval interval; - String version; + final Interval interval; + final String version; if (!materializedBucketIntervals.isEmpty()) { // If granularity spec has explicit intervals, we just need to find the version associated to the interval. // This is because we should have gotten all required locks up front when the task starts up. @@ -809,8 +811,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask // We don't have explicit intervals. We can use the segment granularity to figure out what // interval we need, but we might not have already locked it. interval = granularitySpec.getSegmentGranularity().bucket(timestamp); - version = AbstractBatchIndexTask.findVersion(versions, interval); - if (version == null) { + final String existingLockVersion = AbstractBatchIndexTask.findVersion(versions, interval); + if (existingLockVersion == null) { if (ingestionSpec.getTuningConfig() instanceof ParallelIndexTuningConfig) { final int maxAllowedLockCount = ((ParallelIndexTuningConfig) ingestionSpec.getTuningConfig()) .getMaxAllowedLockCount(); @@ -830,6 +832,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval)); } version = lock.getVersion(); + } else { + version = existingLockVersion; } } return new NonnullPair<>(interval, version); @@ -856,23 +860,19 @@ public abstract class AbstractBatchIndexTask extends AbstractTask } /** - * Get the version from the locks for a given timestamp. This will work if the locks were acquired upfront - * @param timestamp - * @return The interval andversion if n interval that contains an interval was found or null otherwise + * @return The interval and version containing the given timestamp if one exists, otherwise null. */ @Nullable - Pair<Interval, String> lookupVersion(DateTime timestamp) - { - java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion = intervalToVersion.entrySet() - .stream() - .filter(e -> e.getKey() - .contains( - timestamp)) - .findFirst(); - if (!intervalAndVersion.isPresent()) { - return null; - } - return new Pair(intervalAndVersion.get().getKey(), intervalAndVersion.get().getValue()); + private Pair<Interval, String> lookupVersion(DateTime timestamp) + { + java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion + = intervalToLockVersion.entrySet() + .stream() + .filter(e -> e.getKey().contains(timestamp)) + .findFirst(); + return intervalAndVersion.map( + entry -> new Pair<>(entry.getKey(), entry.getValue()) + ).orElse(null); } protected SegmentIdWithShardSpec allocateNewSegmentForTombstone( @@ -891,4 +891,52 @@ public abstract class AbstractBatchIndexTask extends AbstractTask ); } + @Nullable + protected Map<String, Object> getTaskCompletionRowStats() + { + return null; + } + + @Nullable + protected Map<String, Object> getTaskCompletionUnparseableEvents() + { + return null; + } + + /** + * Builds a singleton map with {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY} + * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as value. + */ + protected Map<String, TaskReport> buildIngestionStatsReport( + IngestionState ingestionState, + String errorMessage, + Long segmentsRead, + Long segmentsPublished + ) + { + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + getId(), + new IngestionStatsAndErrors( + ingestionState, + getTaskCompletionUnparseableEvents(), + getTaskCompletionRowStats(), + errorMessage, + segmentAvailabilityConfirmationCompleted, + segmentAvailabilityWaitTimeMs, + Collections.emptyMap(), + segmentsRead, + segmentsPublished + ) + ) + ); + } + + protected static boolean addBuildSegmentStatsToReport(boolean isFullReport, IngestionState ingestionState) + { + return isFullReport + || ingestionState == IngestionState.BUILD_SEGMENTS + || ingestionState == IngestionState.COMPLETED; + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index e0c7f9bc934..c00f219c777 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -42,8 +42,8 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -614,7 +614,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( getId(), - new IngestionStatsAndErrorsTaskReportData( + new IngestionStatsAndErrors( ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 019b63520e7..5fd3572f9e2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -43,8 +43,6 @@ import org.apache.druid.indexer.MetadataStorageUpdaterJobHandler; import org.apache.druid.indexer.TaskMetricsGetter; import org.apache.druid.indexer.TaskMetricsUtils; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskReport; @@ -684,25 +682,11 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler private Map<String, TaskReport> getTaskCompletionReports() { - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - getId(), - new IngestionStatsAndErrorsTaskReportData( - ingestionState, - null, - getTaskCompletionRowStats(), - errorMsg, - segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null - ) - ) - ); + return buildIngestionStatsReport(ingestionState, errorMsg, null, null); } - private Map<String, Object> getTaskCompletionRowStats() + @Override + protected Map<String, Object> getTaskCompletionRowStats() { Map<String, Object> metrics = new HashMap<>(); if (determineConfigStatus != null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 950515d7348..49041ce4d70 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -48,8 +48,6 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SecondaryPartitionType; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; @@ -335,34 +333,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetUnparseableEvents(full)).build(); + return Response.ok(doGetUnparseableEvents(full != null)).build(); } - public Map<String, Object> doGetUnparseableEvents(String full) + public Map<String, Object> doGetUnparseableEvents(boolean isFullReport) { - Map<String, Object> events = new HashMap<>(); + final Map<String, Object> events = new HashMap<>(); - boolean needsDeterminePartitions = false; - boolean needsBuildSegments = false; - - if (full != null) { - needsDeterminePartitions = true; - needsBuildSegments = true; - } else { - switch (ingestionState) { - case DETERMINE_PARTITIONS: - needsDeterminePartitions = true; - break; - case BUILD_SEGMENTS: - case COMPLETED: - needsBuildSegments = true; - break; - default: - break; - } - } - - if (needsDeterminePartitions) { + if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) { events.put( RowIngestionMeters.DETERMINE_PARTITIONS, IndexTaskUtils.getReportListFromSavedParseExceptions( @@ -371,7 +349,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ); } - if (needsBuildSegments) { + if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) { events.put( RowIngestionMeters.BUILD_SEGMENTS, IndexTaskUtils.getReportListFromSavedParseExceptions( @@ -382,33 +360,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler return events; } - public Map<String, Object> doGetRowStats(String full) + public Map<String, Object> doGetRowStats(boolean isFullReport) { Map<String, Object> returnMap = new HashMap<>(); Map<String, Object> totalsMap = new HashMap<>(); Map<String, Object> averagesMap = new HashMap<>(); - boolean needsDeterminePartitions = false; - boolean needsBuildSegments = false; - - if (full != null) { - needsDeterminePartitions = true; - needsBuildSegments = true; - } else { - switch (ingestionState) { - case DETERMINE_PARTITIONS: - needsDeterminePartitions = true; - break; - case BUILD_SEGMENTS: - case COMPLETED: - needsBuildSegments = true; - break; - default: - break; - } - } - - if (needsDeterminePartitions) { + if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) { totalsMap.put( RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsMeters.getTotals() @@ -419,7 +377,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ); } - if (needsBuildSegments) { + if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) { totalsMap.put( RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsMeters.getTotals() @@ -444,7 +402,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetRowStats(full)).build(); + return Response.ok(doGetRowStats(full != null)).build(); } @GET @@ -463,7 +421,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler payload.put("ingestionState", ingestionState); payload.put("unparseableEvents", events); - payload.put("rowStats", doGetRowStats(full)); + payload.put("rowStats", doGetRowStats(full != null)); ingestionStatsAndErrors.put("taskId", getId()); ingestionStatsAndErrors.put("payload", payload); @@ -580,36 +538,16 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler } } - private void updateAndWriteCompletionReports(TaskToolbox toolbox) { - completionReports = getTaskCompletionReports(); + completionReports = buildIngestionStatsReport(ingestionState, errorMsg, null, null); if (isStandAloneTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } - private Map<String, TaskReport> getTaskCompletionReports() - { - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - getId(), - new IngestionStatsAndErrorsTaskReportData( - ingestionState, - getTaskCompletionUnparseableEvents(), - getTaskCompletionRowStats(), - errorMsg, - segmentAvailabilityConfirmationCompleted, - segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null - ) - ) - ); - } - - private Map<String, Object> getTaskCompletionUnparseableEvents() + @Override + protected Map<String, Object> getTaskCompletionUnparseableEvents() { Map<String, Object> unparseableEventsMap = new HashMap<>(); CircularBuffer<ParseExceptionReport> determinePartitionsParseExceptionReports = @@ -631,7 +569,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler return unparseableEventsMap; } - private Map<String, Object> getTaskCompletionRowStats() + @Override + protected Map<String, Object> getTaskCompletionRowStats() { Map<String, Object> metrics = new HashMap<>(); metrics.put( @@ -709,6 +648,12 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler } } + private static boolean addDeterminePartitionStatsToReport(boolean isFullReport, IngestionState ingestionState) + { + return isFullReport + || ingestionState == IngestionState.DETERMINE_PARTITIONS; + } + private static LinearPartitionAnalysis createLinearPartitionAnalysis( GranularitySpec granularitySpec, @Nonnull DynamicPartitionsSpec partitionsSpec diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index db497dff5ec..4d5b9717ca9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -40,8 +40,8 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -1237,37 +1237,32 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen /** * Generate an IngestionStatsAndErrorsTaskReport for the task. - * - * @param taskStatus {@link TaskStatus} - * @param segmentAvailabilityConfirmed Whether or not the segments were confirmed to be available for query when - * when the task completed. - * @return */ - private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus, boolean segmentAvailabilityConfirmed) - { - Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparseableEvents = - doGetRowStatsAndUnparseableEvents("true", true); - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - getId(), - new IngestionStatsAndErrorsTaskReportData( - IngestionState.COMPLETED, - rowStatsAndUnparseableEvents.rhs, - rowStatsAndUnparseableEvents.lhs, - taskStatus.getErrorMsg(), - segmentAvailabilityConfirmed, - segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - segmentsRead, - segmentsPublished - ) - ) + private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus) + { + return buildIngestionStatsReport( + IngestionState.COMPLETED, + taskStatus.getErrorMsg(), + segmentsRead, + segmentsPublished ); } + @Override + protected Map<String, Object> getTaskCompletionRowStats() + { + return doGetRowStatsAndUnparseableEvents(true, false).lhs; + } + + @Override + protected Map<String, Object> getTaskCompletionUnparseableEvents() + { + return doGetRowStatsAndUnparseableEvents(true, true).rhs; + } + private void updateAndWriteCompletionReports(TaskStatus status) { - completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted); + completionReports = getTaskCompletionReports(status); writeCompletionReports(); } @@ -1609,11 +1604,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) { Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { - LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId()); + LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId()); continue; } - RowIngestionMetersTotals rowIngestionMetersTotals = - getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents); + RowIngestionMetersTotals rowIngestionMetersTotals = getBuildSegmentsStatsFromTaskReport( + taskReport, + includeUnparseable ? unparseableEvents : null + ); buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals); } @@ -1647,11 +1644,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport(); if (taskReport == null || taskReport.isEmpty()) { - LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId()); + LOG.warn("Received an empty report from subtask[%s]", generatedPartitionsReport.getTaskId()); continue; } RowIngestionMetersTotals rowStatsForCompletedTask = - getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); + getBuildSegmentsStatsFromTaskReport(taskReport, unparseableEvents); buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); @@ -1730,26 +1727,26 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport( Map<String, TaskReport> taskReport, - boolean includeUnparseable, - List<ParseExceptionReport> unparseableEvents) + List<ParseExceptionReport> unparseableEvents + ) { - IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get( - IngestionStatsAndErrorsTaskReport.REPORT_KEY); - IngestionStatsAndErrorsTaskReportData reportData = - (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload(); + IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) + taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY); + IngestionStatsAndErrors reportData = ingestionStatsAndErrorsReport.getPayload(); RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats( reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS) ); - if (includeUnparseable) { - List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents() - .get(RowIngestionMeters.BUILD_SEGMENTS); - unparseableEvents.addAll(taskUnparsebleEvents); + if (unparseableEvents != null) { + unparseableEvents.addAll( + (List<ParseExceptionReport>) + reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS) + ); } return totals; } private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents( - String full, + boolean isFullReport, boolean includeUnparseable ) { @@ -1779,7 +1776,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } } else { IndexTask currentSequentialTask = (IndexTask) currentRunner; - return Pair.of(currentSequentialTask.doGetRowStats(full), currentSequentialTask.doGetUnparseableEvents(full)); + return Pair.of( + currentSequentialTask.doGetRowStats(isFullReport), + currentSequentialTask.doGetUnparseableEvents(isFullReport) + ); } } @@ -1800,25 +1800,25 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetRowStatsAndUnparseableEvents(full, false).lhs).build(); + return Response.ok(doGetRowStatsAndUnparseableEvents(full != null, false).lhs).build(); } @VisibleForTesting - public Map<String, Object> doGetLiveReports(String full) + public Map<String, Object> doGetLiveReports(boolean isFullReport) { Map<String, Object> returnMap = new HashMap<>(); Map<String, Object> ingestionStatsAndErrors = new HashMap<>(); Map<String, Object> payload = new HashMap<>(); Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents = - doGetRowStatsAndUnparseableEvents(full, true); + doGetRowStatsAndUnparseableEvents(isFullReport, true); // use the sequential task's ingestion state if we were running that mode IngestionState ingestionStateForReport; if (isParallelMode()) { ingestionStateForReport = ingestionState; } else { - IndexTask currentSequentialTask = (IndexTask) currentSubTaskHolder.getTask(); + IndexTask currentSequentialTask = currentSubTaskHolder.getTask(); ingestionStateForReport = currentSequentialTask == null ? ingestionState : currentSequentialTask.getIngestionState(); @@ -1846,7 +1846,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetLiveReports(full)).build(); + return Response.ok(doGetLiveReports(full != null)).build(); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 46be219d878..bbeb00aa845 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -25,8 +25,6 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -253,25 +251,16 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e */ private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead) { - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - getId(), - new IngestionStatsAndErrorsTaskReportData( - IngestionState.COMPLETED, - getTaskCompletionUnparseableEvents(), - getTaskCompletionRowStats(), - "", - false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - segmentsRead, - null - ) - ) + return buildIngestionStatsReport( + IngestionState.COMPLETED, + "", + segmentsRead, + null ); } - private Map<String, Object> getTaskCompletionUnparseableEvents() + @Override + protected Map<String, Object> getTaskCompletionUnparseableEvents() { Map<String, Object> unparseableEventsMap = new HashMap<>(); List<ParseExceptionReport> parseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions( @@ -287,13 +276,12 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e return unparseableEventsMap; } - private Map<String, Object> getTaskCompletionRowStats() + @Override + protected Map<String, Object> getTaskCompletionRowStats() { - Map<String, Object> metrics = new HashMap<>(); - metrics.put( + return Collections.singletonMap( RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsMeters.getTotals() ); - return metrics; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 2ce427f7c84..465750bad03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; @@ -33,8 +32,6 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -499,22 +496,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); Map<String, List<ParseExceptionReport>> events = new HashMap<>(); - boolean needsBuildSegments = false; - - if (full != null) { - needsBuildSegments = true; - } else { - switch (ingestionState) { - case BUILD_SEGMENTS: - case COMPLETED: - needsBuildSegments = true; - break; - default: - break; - } - } - - if (needsBuildSegments) { + if (addBuildSegmentStatsToReport(full != null, ingestionState)) { events.put( RowIngestionMeters.BUILD_SEGMENTS, IndexTaskUtils.getReportListFromSavedParseExceptions( @@ -526,28 +508,13 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand return Response.ok(events).build(); } - private Map<String, Object> doGetRowStats(String full) + private Map<String, Object> doGetRowStats(boolean isFullReport) { Map<String, Object> returnMap = new HashMap<>(); Map<String, Object> totalsMap = new HashMap<>(); Map<String, Object> averagesMap = new HashMap<>(); - boolean needsBuildSegments = false; - - if (full != null) { - needsBuildSegments = true; - } else { - switch (ingestionState) { - case BUILD_SEGMENTS: - case COMPLETED: - needsBuildSegments = true; - break; - default: - break; - } - } - - if (needsBuildSegments) { + if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) { totalsMap.put( RowIngestionMeters.BUILD_SEGMENTS, rowIngestionMeters.getTotals() @@ -572,11 +539,10 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetRowStats(full)).build(); + return Response.ok(doGetRowStats(full != null)).build(); } - @VisibleForTesting - public Map<String, Object> doGetLiveReports(String full) + private Map<String, Object> doGetLiveReports(boolean isFullReport) { Map<String, Object> returnMap = new HashMap<>(); Map<String, Object> ingestionStatsAndErrors = new HashMap<>(); @@ -585,7 +551,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand payload.put("ingestionState", ingestionState); payload.put("unparseableEvents", events); - payload.put("rowStats", doGetRowStats(full)); + payload.put("rowStats", doGetRowStats(isFullReport)); ingestionStatsAndErrors.put("taskId", getId()); ingestionStatsAndErrors.put("payload", payload); @@ -604,45 +570,28 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand ) { IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - return Response.ok(doGetLiveReports(full)).build(); + return Response.ok(doGetLiveReports(full != null)).build(); } - private Map<String, Object> getTaskCompletionRowStats() + @Override + protected Map<String, Object> getTaskCompletionRowStats() { - Map<String, Object> metrics = new HashMap<>(); - metrics.put( + return Collections.singletonMap( RowIngestionMeters.BUILD_SEGMENTS, rowIngestionMeters.getTotals() ); - return metrics; } /** * Generate an IngestionStatsAndErrorsTaskReport for the task. - ** - * @return */ private Map<String, TaskReport> getTaskCompletionReports() { - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - getId(), - new IngestionStatsAndErrorsTaskReportData( - IngestionState.COMPLETED, - getTaskCompletionUnparseableEvents(), - getTaskCompletionRowStats(), - errorMsg, - false, // not applicable for parallel subtask - segmentAvailabilityWaitTimeMs, - Collections.emptyMap(), - null, - null - ) - ) - ); + return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, null); } - private Map<String, Object> getTaskCompletionUnparseableEvents() + @Override + protected Map<String, Object> getTaskCompletionUnparseableEvents() { Map<String, Object> unparseableEventsMap = new HashMap<>(); List<ParseExceptionReport> parseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 09e89b1d601..f3e1e4a06d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -51,8 +51,8 @@ import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -1123,7 +1123,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff return TaskReport.buildTaskReports( new IngestionStatsAndErrorsTaskReport( task.getId(), - new IngestionStatsAndErrorsTaskReportData( + new IngestionStatsAndErrors( ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index ef769d73006..e91d25c0b03 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -52,7 +52,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; @@ -690,7 +690,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand TaskStatus status = statusFuture.get(); Assert.assertTrue(status.getErrorMsg().contains("org.apache.druid.java.util.common.RE: Max parse exceptions[0] exceeded")); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -798,7 +798,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand final TaskStatus taskStatus = statusFuture.get(); Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); } @@ -901,7 +901,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand final TaskStatus taskStatus = statusFuture.get(); Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); ParseExceptionReport parseExceptionReport = @@ -981,7 +981,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); Assert.assertTrue(taskStatus.getErrorMsg().contains("Max parse exceptions[3] exceeded")); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, @@ -1257,7 +1257,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand ) ); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Pattern errorPattern = Pattern.compile( @@ -1676,7 +1676,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand } } - private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException + private IngestionStatsAndErrors getTaskReportData() throws IOException { Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue( reportsFile, @@ -1684,7 +1684,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand { } ); - return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports( + return IngestionStatsAndErrors.getPayloadFromTaskReports( taskReports ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 9b68cebe9ee..5c42ea4da4d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -39,7 +39,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; @@ -248,18 +248,18 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis Assert.assertEquals("Compaction state for " + segment.getId(), expectedState, segment.getLastCompactionState()); } - List<IngestionStatsAndErrorsTaskReportData> reports = getIngestionReports(); + List<IngestionStatsAndErrors> reports = getIngestionReports(); Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task // this test reads 3 segments and publishes 6 segments Assert.assertEquals( 3, - reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum() + reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum() ); Assert.assertEquals( 6, reports.stream() - .mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished) + .mapToLong(IngestionStatsAndErrors::getSegmentsPublished) .sum() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 03994e35e51..6f6b3e8ef48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -557,7 +557,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Granularities.MINUTE, null ), - IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), + IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true), false, false ), @@ -1900,7 +1900,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Granularities.MINUTE, null ), - IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), + IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true), appendToExisting, false ), @@ -1940,7 +1940,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Granularities.MINUTE, null ), - IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), + IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true), appendToExisting, false ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index b049b472d35..2619a23ba33 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -45,7 +45,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -77,7 +77,6 @@ import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; import org.apache.druid.segment.incremental.RowIngestionMeters; -import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -86,7 +85,6 @@ import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; -import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; @@ -106,17 +104,19 @@ import org.apache.druid.timeline.partition.PartitionIds; import org.apache.druid.timeline.partition.ShardSpec; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response; import java.io.BufferedWriter; import java.io.File; import java.io.IOException; @@ -129,6 +129,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; @RunWith(Parameterized.class) public class IndexTaskTest extends IngestionTestBase @@ -136,9 +137,6 @@ public class IndexTaskTest extends IngestionTestBase @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private static final String DATASOURCE = "test"; private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec( @@ -173,19 +171,17 @@ public class IndexTaskTest extends IngestionTestBase private static final IndexSpec INDEX_SPEC = IndexSpec.DEFAULT; private final ObjectMapper jsonMapper; private final IndexIO indexIO; - private final RowIngestionMetersFactory rowIngestionMetersFactory; private final LockGranularity lockGranularity; private final boolean useInputFormatApi; - private AppenderatorsManager appenderatorsManager; private SegmentCacheManager segmentCacheManager; private TestTaskRunner taskRunner; + private File tmpDir; public IndexTaskTest(LockGranularity lockGranularity, boolean useInputFormatApi) { this.jsonMapper = getObjectMapper(); this.indexIO = getIndexIO(); - this.rowIngestionMetersFactory = getRowIngestionMetersFactory(); this.lockGranularity = lockGranularity; this.useInputFormatApi = useInputFormatApi; } @@ -193,9 +189,8 @@ public class IndexTaskTest extends IngestionTestBase @Before public void setup() throws IOException { - appenderatorsManager = new TestAppenderatorsManager(); - final File cacheDir = temporaryFolder.newFolder(); + tmpDir = temporaryFolder.newFolder(); segmentCacheManager = new SegmentLocalCacheManager( new SegmentLoaderConfig() { @@ -213,12 +208,9 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testCorrectInputSourceResources() throws IOException + public void testCorrectInputSourceResources() { - File tmpDir = temporaryFolder.newFolder(); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( new IndexIngestionSpec( new DataSchema( "test-json", @@ -263,19 +255,13 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testIngestNullOnlyColumns() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,,\n"); writer.write("2014-01-01T01:00:20Z,,\n"); writer.write("2014-01-01T02:00:30Z,,\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( new IndexIngestionSpec( new DataSchema( "test-json", @@ -318,19 +304,13 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testIngestNullOnlyColumns_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,,\n"); writer.write("2014-01-01T01:00:20Z,,\n"); writer.write("2014-01-01T02:00:30Z,,\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( new IndexIngestionSpec( new DataSchema( "test-json", @@ -374,23 +354,14 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testDeterminePartitions() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, - null, null, createTuningConfigWithMaxRowsPerSegment(2, true), false, @@ -429,11 +400,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testTransformSpec() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,an|array,1|2|3,1\n"); writer.write("2014-01-01T01:00:20Z,b,another|array,3|4,1\n"); writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n"); @@ -472,8 +439,6 @@ public class IndexTaskTest extends IngestionTestBase final IndexIngestionSpec indexIngestionSpec; if (useInputFormatApi) { indexIngestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, DEFAULT_TIMESTAMP_SPEC, dimensionsSpec, new CsvInputFormat(columns, listDelimiter, null, false, 0), @@ -496,12 +461,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, - indexIngestionSpec, - null - ); + IndexTask indexTask = createIndexTask(indexIngestionSpec, null); Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); @@ -568,27 +528,18 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testWithArbitraryGranularity() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new ArbitraryGranularitySpec( Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -597,34 +548,29 @@ public class IndexTaskTest extends IngestionTestBase ); final List<DataSegment> segments = runSuccessfulTask(indexTask); - Assert.assertEquals(1, segments.size()); + + invokeApi(req -> indexTask.getLiveReports(req, null)); + invokeApi(req -> indexTask.getLiveReports(req, "full")); + invokeApi(req -> indexTask.getRowStats(req, null)); + invokeApi(req -> indexTask.getRowStats(req, "full")); } @Test public void testIntervalBucketing() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T07:59:59.977Z,a,1\n"); writer.write("2014-01-01T08:00:00.000Z,b,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z")) ), - null, createTuningConfigWithMaxRowsPerSegment(50, true), false, false @@ -640,24 +586,16 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testNumShardsProvided() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, null, - null, - createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true), + createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null)), false, false ), @@ -681,25 +619,17 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testNumShardsAndHashPartitionFunctionProvided() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, - null, null, createTuningConfigWithPartitionsSpec( - new HashedPartitionsSpec(null, 1, null, HashPartitionFunction.MURMUR3_32_ABS), true + new HashedPartitionsSpec(null, 1, null, HashPartitionFunction.MURMUR3_32_ABS) ), false, false @@ -724,24 +654,16 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testNumShardsAndPartitionDimensionsProvided() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - final IndexTask indexTask = new IndexTask( - null, - null, + final IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, - null, null, - createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true), + createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim"))), false, false ), @@ -797,22 +719,14 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSuccessfullyAppend() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, - null, null, createTuningConfigWithMaxRowsPerSegment(2, false), true, @@ -842,27 +756,19 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testIntervalNotSpecified() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -893,49 +799,39 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testIntervalNotSpecifiedWithReplace() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } // Expect exception if reingest with dropExisting and null intervals is attempted - expectedException.expect(IAE.class); - expectedException.expectMessage( - "GranularitySpec's intervals cannot be empty for replace." - ); - IndexTask indexTask = new IndexTask( - null, - null, - createDefaultIngestionSpec( - jsonMapper, - tmpDir, - new UniformGranularitySpec( - Granularities.HOUR, - Granularities.MINUTE, - null + Exception exception = Assert.assertThrows( + IAE.class, + () -> createIndexTask( + createDefaultIngestionSpec( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + createTuningConfigWithMaxRowsPerSegment(2, true), + false, + true ), - null, - createTuningConfigWithMaxRowsPerSegment(2, true), - false, - true - ), - null + null + ) + ); + Assert.assertEquals( + "GranularitySpec's intervals cannot be empty for replace.", + exception.getMessage() ); - } @Test public void testCSVFileWithHeader() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("time,d,val\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } @@ -956,8 +852,6 @@ public class IndexTaskTest extends IngestionTestBase ); } else { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, DimensionsSpec.EMPTY, new CsvInputFormat(null, null, null, true, 0), @@ -969,9 +863,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -988,11 +880,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testCSVFileWithHeaderColumnOverride() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("time,d,val\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } @@ -1003,8 +891,6 @@ public class IndexTaskTest extends IngestionTestBase final IndexIngestionSpec ingestionSpec; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, DimensionsSpec.EMPTY, new CsvInputFormat(columns, null, null, true, 0), @@ -1027,9 +913,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -1046,10 +930,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testWithSmallMaxTotalRows() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T00:00:10Z,b,2\n"); writer.write("2014-01-01T00:00:10Z,c,3\n"); @@ -1061,19 +942,14 @@ public class IndexTaskTest extends IngestionTestBase writer.write("2014-01-01T02:00:30Z,c,3\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, - createTuningConfig(2, 2, null, 2L, null, false, true), + createTuningConfig(2, 2, 2L, null, false, true), false, false ), @@ -1099,25 +975,17 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testPerfectRollup() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - populateRollupTestData(tmpFile); + populateRollupTestData(createTempFile()); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), - null, - createTuningConfig(3, 2, null, 2L, null, true, true), + createTuningConfig(3, 2, 2L, null, true, true), false, false ), @@ -1134,7 +1002,7 @@ public class IndexTaskTest extends IngestionTestBase Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); - Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class)); + Assert.assertEquals(segment.getShardSpec().getClass(), HashBasedNumberedShardSpec.class); Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); } } @@ -1142,25 +1010,17 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testBestEffortRollup() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - populateRollupTestData(tmpFile); + populateRollupTestData(createTempFile()); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), - null, - createTuningConfig(3, 2, null, 2L, null, false, true), + createTuningConfig(3, 2, 2L, null, false, true), false, false ), @@ -1183,24 +1043,17 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testWaitForSegmentAvailabilityNoSegments() throws IOException + public void testWaitForSegmentAvailabilityNoSegments() { - final File tmpDir = temporaryFolder.newFolder(); - TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); List<DataSegment> segmentsToWaitFor = new ArrayList<>(); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -1214,25 +1067,18 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testWaitForSegmentAvailabilityInvalidWaitTimeout() throws IOException + public void testWaitForSegmentAvailabilityInvalidWaitTimeout() { - final File tmpDir = temporaryFolder.newFolder(); - TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); List<DataSegment> segmentsToWaitFor = new ArrayList<>(); segmentsToWaitFor.add(EasyMock.createMock(DataSegment.class)); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -1246,10 +1092,8 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws IOException + public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() { - final File tmpDir = temporaryFolder.newFolder(); - TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); SegmentHandoffNotifierFactory mockFactory = EasyMock.createMock(SegmentHandoffNotifierFactory.class); SegmentHandoffNotifier mockNotifier = EasyMock.createMock(SegmentHandoffNotifier.class); @@ -1260,18 +1104,13 @@ public class IndexTaskTest extends IngestionTestBase segmentsToWaitFor.add(mockDataSegment1); segmentsToWaitFor.add(mockDataSegment2); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -1309,10 +1148,8 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOException + public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() { - final File tmpDir = temporaryFolder.newFolder(); - TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class); @@ -1321,18 +1158,13 @@ public class IndexTaskTest extends IngestionTestBase segmentsToWaitFor.add(mockDataSegment1); segmentsToWaitFor.add(mockDataSegment2); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -1365,10 +1197,8 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException + public void testWaitForSegmentAvailabilityEmitsExpectedMetric() { - final File tmpDir = temporaryFolder.newFolder(); - TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class); @@ -1377,18 +1207,13 @@ public class IndexTaskTest extends IngestionTestBase segmentsToWaitFor.add(mockDataSegment1); segmentsToWaitFor.add(mockDataSegment2); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, null ), - null, createTuningConfigWithMaxRowsPerSegment(2, true), false, false @@ -1438,14 +1263,15 @@ public class IndexTaskTest extends IngestionTestBase } } + private File createTempFile() throws IOException + { + return File.createTempFile("druid", "index", tmpDir); + } + @Test public void testIgnoreParseException() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("time,d,val\n"); writer.write("unparseable,a,1\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); @@ -1454,15 +1280,13 @@ public class IndexTaskTest extends IngestionTestBase final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); final List<String> columns = Arrays.asList("time", "dim", "val"); // ignore parse exception - final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, false); + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, false, false); // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() final IndexIngestionSpec parseExceptionIgnoreSpec; if (useInputFormatApi) { parseExceptionIgnoreSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, DimensionsSpec.EMPTY, new CsvInputFormat(columns, null, null, true, 0), @@ -1485,12 +1309,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, - parseExceptionIgnoreSpec, - null - ); + IndexTask indexTask = createIndexTask(parseExceptionIgnoreSpec, null); final List<DataSegment> segments = runSuccessfulTask(indexTask); @@ -1502,10 +1321,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testReportParseException() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - + final File tmpFile = createTempFile(); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { writer.write("time,d,val\n"); writer.write("unparseable,a,1\n"); @@ -1515,13 +1331,11 @@ public class IndexTaskTest extends IngestionTestBase final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); final List<String> columns = Arrays.asList("time", "dim", "val"); // report parse exception - final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, false, true); final IndexIngestionSpec indexIngestionSpec; List<String> expectedMessages; if (useInputFormatApi) { indexIngestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, DimensionsSpec.EMPTY, new CsvInputFormat(columns, null, null, true, 0), @@ -1550,18 +1364,13 @@ public class IndexTaskTest extends IngestionTestBase tmpFile.toURI() ) ); - IndexTask indexTask = new IndexTask( - null, - null, - indexIngestionSpec, - null - ); + IndexTask indexTask = createIndexTask(indexIngestionSpec, null); TaskStatus status = runTask(indexTask).lhs; Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); checkTaskStatusErrorMsgForParseExceptionsExceeded(status); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -1574,10 +1383,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testMultipleParseExceptionsSuccess() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - + final File tmpFile = createTempFile(); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { writer.write("{\"time\":\"unparseable\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // unparseable time writer.write("{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // valid row @@ -1633,8 +1439,6 @@ public class IndexTaskTest extends IngestionTestBase final IndexIngestionSpec ingestionSpec; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, dimensionsSpec, new JsonInputFormat(null, null, null, null, null), @@ -1657,18 +1461,13 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, - ingestionSpec, - null - ); + IndexTask indexTask = createIndexTask(ingestionSpec, null); TaskStatus status = runTask(indexTask).lhs; Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); Assert.assertNull(status.getErrorMsg()); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, @@ -1759,10 +1558,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testMultipleParseExceptionsFailure() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - + final File tmpFile = createTempFile(); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { writer.write("time,dim,dimLong,dimFloat,val\n"); writer.write("unparseable,a,2,3.0,1\n"); // unparseable @@ -1815,8 +1611,6 @@ public class IndexTaskTest extends IngestionTestBase List<String> expectedMessages; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, dimensionsSpec, new CsvInputFormat(columns, null, null, true, 0), @@ -1853,9 +1647,7 @@ public class IndexTaskTest extends IngestionTestBase tmpFile.toURI() ) ); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -1864,7 +1656,7 @@ public class IndexTaskTest extends IngestionTestBase Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); checkTaskStatusErrorMsgForParseExceptionsExceeded(status); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, @@ -1902,10 +1694,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - + final File tmpFile = createTempFile(); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { writer.write("time,dim,dimLong,dimFloat,val\n"); writer.write("unparseable,a,2,3.0,1\n"); // unparseable @@ -1958,8 +1747,6 @@ public class IndexTaskTest extends IngestionTestBase List<String> expectedMessages; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, timestampSpec, dimensionsSpec, new CsvInputFormat(columns, null, null, true, 0), @@ -1987,9 +1774,7 @@ public class IndexTaskTest extends IngestionTestBase StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)", tmpFile.toURI()), StringUtils.format("Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 2)", tmpFile.toURI()) ); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -1998,7 +1783,7 @@ public class IndexTaskTest extends IngestionTestBase Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); checkTaskStatusErrorMsgForParseExceptionsExceeded(status); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); Map<String, Object> expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, @@ -2036,36 +1821,26 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testCsvWithHeaderOfEmptyColumns() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("ts,,\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } - tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("ts,dim,\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } - tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("ts,,val\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); } // report parse exception - final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, null, true, true); + final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, true, true); final IndexIngestionSpec ingestionSpec; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, new CsvInputFormat(null, null, null, true, 0), @@ -2088,9 +1863,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -2123,10 +1896,7 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testCsvWithHeaderOfEmptyTimestamp() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - + final File tmpFile = createTempFile(); try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { writer.write(",,\n"); writer.write("2014-01-01T00:00:10Z,a,1\n"); @@ -2134,13 +1904,11 @@ public class IndexTaskTest extends IngestionTestBase final List<String> columns = Arrays.asList("ts", "", ""); // report parse exception - final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); + final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, false, true); final IndexIngestionSpec ingestionSpec; List<String> expectedMessages; if (useInputFormatApi) { ingestionSpec = createIngestionSpec( - jsonMapper, - tmpDir, DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, new CsvInputFormat(columns, null, null, true, 0), @@ -2169,9 +1937,7 @@ public class IndexTaskTest extends IngestionTestBase tmpFile.toURI() ) ); - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( ingestionSpec, null ); @@ -2181,7 +1947,7 @@ public class IndexTaskTest extends IngestionTestBase checkTaskStatusErrorMsgForParseExceptionsExceeded(status); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); + IngestionStatsAndErrors reportData = getTaskReportData(); ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -2196,26 +1962,18 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testOverwriteWithSameSegmentGranularity() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - - populateRollupTestData(tmpFile); + populateRollupTestData(createTempFile()); for (int i = 0; i < 2; i++) { - final IndexTask indexTask = new IndexTask( - null, - null, + final IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.DAY, true, null ), - null, - createTuningConfig(3, 2, null, 2L, null, false, true), + createTuningConfig(3, 2, 2L, null, false, true), false, false ), @@ -2260,27 +2018,19 @@ public class IndexTaskTest extends IngestionTestBase @Test public void testOverwriteWithDifferentSegmentGranularity() throws Exception { - final File tmpDir = temporaryFolder.newFolder(); - final File tmpFile = File.createTempFile("druid", "index", tmpDir); - - populateRollupTestData(tmpFile); + populateRollupTestData(createTempFile()); for (int i = 0; i < 2; i++) { final Granularity segmentGranularity = i == 0 ? Granularities.DAY : Granularities.MONTH; - final IndexTask indexTask = new IndexTask( - null, - null, + final IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( segmentGranularity, Granularities.DAY, true, null ), - null, - createTuningConfig(3, 2, null, 2L, null, false, true), + createTuningConfig(3, 2, 2L, null, false, true), false, false ), @@ -2305,55 +2055,43 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() throws Exception + public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() { - final File tmpDir = temporaryFolder.newFolder(); - final IndexTask task = new IndexTask( - null, - null, + final IndexTask task = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, - null, null, - createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true), + createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false)), false, false ), null ); - expectedException.expect(UnsupportedOperationException.class); - expectedException.expectMessage( - "partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec] is not supported" + Exception exception = Assert.assertThrows( + UnsupportedOperationException.class, + () -> task.isReady(createActionClient(task)) + ); + Assert.assertEquals( + "partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec] is not supported", + exception.getMessage() ); - task.isReady(createActionClient(task)); } @Test public void testOldSegmentNotReplacedWhenDropFlagFalse() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.YEAR, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2365,24 +2103,19 @@ public class IndexTaskTest extends IngestionTestBase List<DataSegment> segments = runSuccessfulTask(indexTask); Assert.assertEquals(1, segments.size()); - Set<DataSegment> usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments(); Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); for (DataSegment segment : usedSegmentsBeforeOverwrite) { Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); } - indexTask = new IndexTask( - null, - null, + indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.MINUTE, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2394,7 +2127,7 @@ public class IndexTaskTest extends IngestionTestBase segments = runSuccessfulTask(indexTask); Assert.assertEquals(3, segments.size()); - Set<DataSegment> usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments(); Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size()); int yearSegmentFound = 0; int minuteSegmentFound = 0; @@ -2414,30 +2147,22 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment() throws Exception + public void testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment() + throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T01:00:10Z,a,1\n"); writer.write("2014-01-01T01:10:20Z,b,1\n"); writer.write("2014-01-01T01:20:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2449,24 +2174,19 @@ public class IndexTaskTest extends IngestionTestBase List<DataSegment> segments = runSuccessfulTask(indexTask); Assert.assertEquals(1, segments.size()); - Set<DataSegment> usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments(); Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); for (DataSegment segment : usedSegmentsBeforeOverwrite) { Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval())); } - indexTask = new IndexTask( - null, - null, + indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01T01:10:00Z/2014-01-01T02:00:00Z")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, true @@ -2478,7 +2198,7 @@ public class IndexTaskTest extends IngestionTestBase segments = runSuccessfulTask(indexTask); Assert.assertEquals(1, segments.size()); - Set<DataSegment> usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments(); Assert.assertEquals(2, usedSegmentsBeforeAfterOverwrite.size()); int segmentFound = 0; int tombstonesFound = 0; @@ -2506,30 +2226,22 @@ public class IndexTaskTest extends IngestionTestBase } @Test - public void testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment() throws Exception + public void testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment() + throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-01-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2541,24 +2253,19 @@ public class IndexTaskTest extends IngestionTestBase List<DataSegment> segments = runSuccessfulTask(indexTask); Assert.assertEquals(1, segments.size()); - Set<DataSegment> usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments(); Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); for (DataSegment segment : usedSegmentsBeforeOverwrite) { Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval())); } - indexTask = new IndexTask( - null, - null, + indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, true @@ -2570,7 +2277,7 @@ public class IndexTaskTest extends IngestionTestBase segments = runSuccessfulTask(indexTask); Assert.assertEquals(24, segments.size()); - Set<DataSegment> usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments(); Assert.assertEquals(24, usedSegmentsBeforeAfterOverwrite.size()); for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) { // Used segments after overwrite and drop will contain only the @@ -2586,28 +2293,19 @@ public class IndexTaskTest extends IngestionTestBase @Test public void verifyPublishingOnlyTombstones() throws Exception { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-03-01T00:00:10Z,a,1\n"); writer.write("2014-03-01T01:00:20Z,b,1\n"); writer.write("2014-03-01T02:00:30Z,c,1\n"); } - IndexTask indexTask = new IndexTask( - null, - null, + IndexTask indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-03/2014-04-01")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2619,7 +2317,7 @@ public class IndexTaskTest extends IngestionTestBase List<DataSegment> segments = runSuccessfulTask(indexTask); Assert.assertEquals(1, segments.size()); - Set<DataSegment> usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments(); Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); for (DataSegment segment : usedSegmentsBeforeOverwrite) { Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval())); @@ -2628,25 +2326,19 @@ public class IndexTaskTest extends IngestionTestBase // create new data but with an ingestion interval appropriate to filter it all out so that only tombstones // are created: tmpDir = temporaryFolder.newFolder(); - tmpFile = File.createTempFile("druid", "index", tmpDir); - try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + try (BufferedWriter writer = Files.newWriter(createTempFile(), StandardCharsets.UTF_8)) { writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T01:00:20Z,b,1\n"); writer.write("2014-12-01T02:00:30Z,c,1\n"); } - indexTask = new IndexTask( - null, - null, + indexTask = createIndexTask( createDefaultIngestionSpec( - jsonMapper, - tmpDir, new UniformGranularitySpec( Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-03-01/2014-04-01")) // filter out all data ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, true @@ -2661,37 +2353,35 @@ public class IndexTaskTest extends IngestionTestBase Assert.assertTrue(segments.get(0).isTombstone()); } - + @Test - public void testErrorWhenDropFlagTrueAndOverwriteFalse() throws Exception + public void testErrorWhenDropFlagTrueAndOverwriteFalse() { - expectedException.expect(IAE.class); - expectedException.expectMessage( - "Cannot simultaneously replace and append to existing segments. Either dropExisting or appendToExisting should be set to false" - ); - new IndexTask( - null, - null, - createDefaultIngestionSpec( - jsonMapper, - temporaryFolder.newFolder(), - new UniformGranularitySpec( - Granularities.MINUTE, - Granularities.MINUTE, - Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + Exception exception = Assert.assertThrows( + IAE.class, + () -> createIndexTask( + createDefaultIngestionSpec( + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + createTuningConfigWithMaxRowsPerSegment(10, true), + true, + true ), - null, - createTuningConfigWithMaxRowsPerSegment(10, true), - true, - true - ), - null + null + ) + ); + Assert.assertEquals( + "Cannot simultaneously replace and append to existing segments." + + " Either dropExisting or appendToExisting should be set to false", + exception.getMessage() ); } - // If isStandaloneTask is false, cleanup should be a no-op @Test - public void testCleanupIndexTask() throws Exception + public void testCleanupIsNoopIfNotStandaloneTask() throws Exception { new IndexTask( null, @@ -2700,14 +2390,11 @@ public class IndexTaskTest extends IngestionTestBase "dataSource", null, createDefaultIngestionSpec( - jsonMapper, - temporaryFolder.newFolder(), new UniformGranularitySpec( Granularities.MINUTE, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2718,11 +2405,8 @@ public class IndexTaskTest extends IngestionTestBase ).cleanUp(null, null); } - /* if shouldCleanup is true, we should fall back to AbstractTask.cleanup, - * check isEncapsulatedTask=false, and then exit. - */ @Test - public void testCleanup() throws Exception + public void testCleanupIsDoneIfStandaloneTask() throws Exception { TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class); TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class); @@ -2736,14 +2420,11 @@ public class IndexTaskTest extends IngestionTestBase "dataSource", null, createDefaultIngestionSpec( - jsonMapper, - temporaryFolder.newFolder(), new UniformGranularitySpec( Granularities.MINUTE, Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - null, createTuningConfigWithMaxRowsPerSegment(10, true), false, false @@ -2758,7 +2439,7 @@ public class IndexTaskTest extends IngestionTestBase public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status) { // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message - Assert.assertThat( + MatcherAssert.assertThat( status.getErrorMsg(), CoreMatchers.containsString("Max parse exceptions") ); @@ -2789,24 +2470,21 @@ public class IndexTaskTest extends IngestionTestBase 1, null, null, - null, forceGuaranteedRollup, true ); } private static IndexTuningConfig createTuningConfigWithPartitionsSpec( - PartitionsSpec partitionsSpec, - boolean forceGuaranteedRollup + PartitionsSpec partitionsSpec ) { return createTuningConfig( null, 1, null, - null, partitionsSpec, - forceGuaranteedRollup, + true, true ); } @@ -2814,7 +2492,6 @@ public class IndexTaskTest extends IngestionTestBase static IndexTuningConfig createTuningConfig( @Nullable Integer maxRowsPerSegment, @Nullable Integer maxRowsInMemory, - @Nullable Long maxBytesInMemory, @Nullable Long maxTotalRows, @Nullable PartitionsSpec partitionsSpec, boolean forceGuaranteedRollup, @@ -2826,7 +2503,7 @@ public class IndexTaskTest extends IngestionTestBase maxRowsPerSegment, null, maxRowsInMemory, - maxBytesInMemory, + null, null, maxTotalRows, null, @@ -2850,7 +2527,26 @@ public class IndexTaskTest extends IngestionTestBase ); } - private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException + @SuppressWarnings("unchecked") + private <T> T invokeApi(Function<HttpServletRequest, Response> api) + { + final HttpServletRequest request = EasyMock.mock(HttpServletRequest.class); + EasyMock.expect(request.getAttribute(EasyMock.anyString())) + .andReturn("allow-all"); + EasyMock.replay(request); + return (T) api.apply(request).getEntity(); + } + + private Set<DataSegment> getAllUsedSegments() + { + return Sets.newHashSet( + getSegmentsMetadataManager() + .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true) + .get() + ); + } + + private IngestionStatsAndErrors getTaskReportData() throws IOException { Map<String, TaskReport> taskReports = jsonMapper.readValue( taskRunner.getTaskReportsFile(), @@ -2858,16 +2554,19 @@ public class IndexTaskTest extends IngestionTestBase { } ); - return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports( - taskReports - ); + return IngestionStatsAndErrors.getPayloadFromTaskReports(taskReports); + } + + private IndexTask createIndexTask( + IndexIngestionSpec ingestionSchema, + Map<String, Object> context + ) + { + return new IndexTask(null, null, ingestionSchema, context); } private IndexIngestionSpec createDefaultIngestionSpec( - ObjectMapper objectMapper, - File baseDir, @Nullable GranularitySpec granularitySpec, - @Nullable TransformSpec transformSpec, IndexTuningConfig tuningConfig, boolean appendToExisting, Boolean dropExisting @@ -2875,12 +2574,10 @@ public class IndexTaskTest extends IngestionTestBase { if (useInputFormatApi) { return createIngestionSpec( - objectMapper, - baseDir, DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC, DEFAULT_INPUT_FORMAT, - transformSpec, + null, granularitySpec, tuningConfig, appendToExisting, @@ -2888,10 +2585,10 @@ public class IndexTaskTest extends IngestionTestBase ); } else { return createIngestionSpec( - objectMapper, - baseDir, + jsonMapper, + tmpDir, DEFAULT_PARSE_SPEC, - transformSpec, + null, granularitySpec, tuningConfig, appendToExisting, @@ -2926,9 +2623,7 @@ public class IndexTaskTest extends IngestionTestBase ); } - static IndexIngestionSpec createIngestionSpec( - ObjectMapper objectMapper, - File baseDir, + private IndexIngestionSpec createIngestionSpec( TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, InputFormat inputFormat, @@ -2940,8 +2635,8 @@ public class IndexTaskTest extends IngestionTestBase ) { return createIngestionSpec( - objectMapper, - baseDir, + jsonMapper, + tmpDir, null, timestampSpec, dimensionsSpec, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java index bc7148c5401..0c95773da92 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.common.task; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -41,7 +41,7 @@ public class ParseExceptionReport @SuppressWarnings("unchecked") public static ParseExceptionReport forPhase( - IngestionStatsAndErrorsTaskReportData reportData, + IngestionStatsAndErrors reportData, String phase ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 53b8ace317a..aece6edb3f4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -26,8 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import org.apache.druid.indexer.IngestionState; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; @@ -59,7 +59,7 @@ public class TaskReportSerdeTest { IngestionStatsAndErrorsTaskReport report1 = new IngestionStatsAndErrorsTaskReport( "testID", - new IngestionStatsAndErrorsTaskReportData( + new IngestionStatsAndErrors( IngestionState.BUILD_SEGMENTS, ImmutableMap.of( "hello", "world" @@ -118,7 +118,7 @@ public class TaskReportSerdeTest IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport( IngestionStatsAndErrorsTaskReport.REPORT_KEY, - new IngestionStatsAndErrorsTaskReportData( + new IngestionStatsAndErrors( IngestionState.COMPLETED, ImmutableMap.of( "hello", "world" diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 6b662e473b0..08b0c584f76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -50,8 +50,8 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; @@ -558,7 +558,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase if (!task.isPresent()) { return null; } - return Futures.immediateFuture(((ParallelIndexSupervisorTask) task.get()).doGetLiveReports("full")); + return Futures.immediateFuture(((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true)); } public TaskContainer getTaskContainer(String taskId) @@ -1074,12 +1074,12 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase }); } - public List<IngestionStatsAndErrorsTaskReportData> getIngestionReports() throws IOException + public List<IngestionStatsAndErrors> getIngestionReports() throws IOException { return getReports().entrySet() .stream() .filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY)) - .map(entry -> (IngestionStatsAndErrorsTaskReportData) entry.getValue().getPayload()) + .map(entry -> (IngestionStatsAndErrors) entry.getValue().getPayload()) .collect(Collectors.toList()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 0da7ed0934e..7ad9f3c9464 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -446,7 +446,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv false, Collections.emptyList() ); - Map<String, Object> actualReports = task.doGetLiveReports("full"); + Map<String, Object> actualReports = task.doGetLiveReports(true); final long processedBytes = useInputFormatApi ? 335 : 0; Map<String, Object> expectedReports = buildExpectedTaskReportParallel( task.getId(), @@ -497,7 +497,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId()); final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask(); - Map<String, Object> actualReports = executedTask.doGetLiveReports("full"); + Map<String, Object> actualReports = executedTask.doGetLiveReports(true); final long processedBytes = useInputFormatApi ? 335 : 0; RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 2db5da143ed..fb454acafec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -52,7 +52,7 @@ import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; @@ -459,7 +459,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport return new SegmentDescriptorAndExpectedDim1Values(interval, partitionNum, expectedDim1Values); } - protected IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException + protected IngestionStatsAndErrors getTaskReportData() throws IOException { Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue( reportsFile, @@ -467,7 +467,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport { } ); - return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports( + return IngestionStatsAndErrors.getPayloadFromTaskReports( taskReports ); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java index fb02d8f8dad..8eadbdf8f11 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java @@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; @@ -487,7 +487,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest if (segmentAvailabilityConfirmationPair.lhs != null && segmentAvailabilityConfirmationPair.lhs) { TaskReport reportRaw = indexer.getTaskReport(taskID).get("ingestionStatsAndErrors"); IngestionStatsAndErrorsTaskReport report = (IngestionStatsAndErrorsTaskReport) reportRaw; - IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData) report.getPayload(); + IngestionStatsAndErrors reportData = (IngestionStatsAndErrors) report.getPayload(); // Confirm that the task waited longer than 0ms for the task to complete. Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index ac4ef536b02..ceaeddea0dc 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -27,7 +27,6 @@ import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.ISE; @@ -263,13 +262,15 @@ public class OverlordResourceTestClient public String getTaskErrorMessage(String taskId) { - return ((IngestionStatsAndErrorsTaskReportData) getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getErrorMsg(); + return getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + .getPayload().getErrorMsg(); } public RowIngestionMetersTotals getTaskStats(String taskId) { try { - Object buildSegment = ((IngestionStatsAndErrorsTaskReportData) getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getRowStats().get("buildSegments"); + Object buildSegment = getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY) + .getPayload().getRowStats().get("buildSegments"); return jsonMapper.convertValue(buildSegment, RowIngestionMetersTotals.class); } catch (Exception e) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 0f160580db5..d419bbb48d3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; @@ -370,7 +370,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest if (segmentAvailabilityConfirmationPair.lhs != null && segmentAvailabilityConfirmationPair.lhs) { TaskReport reportRaw = indexer.getTaskReport(taskID).get("ingestionStatsAndErrors"); IngestionStatsAndErrorsTaskReport report = (IngestionStatsAndErrorsTaskReport) reportRaw; - IngestionStatsAndErrorsTaskReportData reportData = (IngestionStatsAndErrorsTaskReportData) report.getPayload(); + IngestionStatsAndErrors reportData = (IngestionStatsAndErrors) report.getPayload(); // Confirm that the task waited longer than 0ms for the task to complete. Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 296f7fe747a..2b37c7a27d2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -21,8 +21,8 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -176,13 +176,13 @@ public class ITCompactionTaskTest extends AbstractIndexerTest Assert.assertEquals(2, reports.values() .stream() - .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished()) + .mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsPublished()) .sum() ); Assert.assertEquals(4, reports.values() .stream() - .mapToLong(r -> ((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead()) + .mapToLong(r -> ((IngestionStatsAndErrors) r.getPayload()).getSegmentsRead()) .sum() ); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org