This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a83125e4a00 Track IngestionState more accurately in realtime tasks.
(#16934)
a83125e4a00 is described below
commit a83125e4a00c56563709e664a4e9685a834c7ec8
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Aug 21 23:13:46 2024 -0700
Track IngestionState more accurately in realtime tasks. (#16934)
Previously, SeekableStreamIndexTaskRunner set ingestion state to
COMPLETED when it finished reading data from Kafka. This is incorrect.
After the changes in this patch, the transitions go:
1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka,
while it is building its final set of segments to publish.
2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing,
while waiting for handoff.
3) The task transitions to COMPLETED immediately before exiting, when
truly done.
---
docs/ingestion/tasks.md | 1 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 34 ++++++++++++++++++++--
.../indexing/kinesis/KinesisIndexTaskTest.java | 9 ++++++
.../SeekableStreamIndexTaskRunner.java | 3 +-
4 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 3291abb01a0..743a46cc854 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task
reached. Possible sta
- `NOT_STARTED`: The task has not begun reading any rows
- `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning
- `BUILD_SEGMENTS`: The task is processing rows to construct segments
+- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is
waiting for them to become available.
- `COMPLETED`: The task has finished its work.
Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as
those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS
phase.
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15b77be307d..f8c6b23aae9 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -52,6 +52,7 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
+import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@@ -1617,6 +1618,10 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
+ Assert.assertNull(reportData.getErrorMsg());
+
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1697,6 +1702,10 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.BUILD_SEGMENTS,
reportData.getIngestionState());
+ Assert.assertNotNull(reportData.getErrorMsg());
+
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -3057,9 +3066,13 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
newDataSchemaMetadata()
);
- // Verify unparseable data
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
+ Assert.assertNull(reportData.getErrorMsg());
+
+ // Verify unparseable data
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
@@ -3190,9 +3203,14 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());
+ // Verify ingestion state and error message
+ final IngestionStatsAndErrors reportData = getTaskReportData();
+ Assert.assertEquals(IngestionState.BUILD_SEGMENTS,
reportData.getIngestionState());
+ Assert.assertNotNull(reportData.getErrorMsg());
+
// Verify there is no unparseable data in the report since we've 0 saved
parse exceptions
ParseExceptionReport parseExceptionReport =
- ParseExceptionReport.forPhase(getTaskReportData(),
RowIngestionMeters.BUILD_SEGMENTS);
+ ParseExceptionReport.forPhase(reportData,
RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(ImmutableList.of(),
parseExceptionReport.getErrorMessages());
}
@@ -3231,6 +3249,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();
+
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
+ Assert.assertNull(reportData.getErrorMsg());
+
+ // Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(),
(Long) 6L);
}
@@ -3279,6 +3303,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();
+
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
+ Assert.assertNull(reportData.getErrorMsg());
+
+ // Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L,
2L)));
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 80bded2031d..510eaa797e0 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
+import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@@ -1186,6 +1187,10 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
+ Assert.assertNull(reportData.getErrorMsg());
+
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1272,6 +1277,10 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Verify ingestion state and error message
+ Assert.assertEquals(IngestionState.BUILD_SEGMENTS,
reportData.getIngestionState());
+ Assert.assertNotNull(reportData.getErrorMsg());
+
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index d347fd81503..df8e220145b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -768,7 +768,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
}
- ingestionState = IngestionState.COMPLETED;
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
@@ -835,6 +834,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// failed to persist sequences. It might also return null if handoff
failed, but was recoverable.
// See publishAndRegisterHandoff() for details.
List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
+ ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
@@ -928,6 +928,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
+ ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(task.getId(),
getTaskCompletionReports(null, handoffWaitMs));
return TaskStatus.success(task.getId());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]