This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a83125e4a00 Track IngestionState more accurately in realtime tasks. 
(#16934)
a83125e4a00 is described below

commit a83125e4a00c56563709e664a4e9685a834c7ec8
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Aug 21 23:13:46 2024 -0700

    Track IngestionState more accurately in realtime tasks. (#16934)
    
    Previously, SeekableStreamIndexTaskRunner set ingestion state to
    COMPLETED when it finished reading data from Kafka. This is incorrect.
    After the changes in this patch, the transitions go:
    
    1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka,
       while it is building its final set of segments to publish.
    
    2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing,
       while waiting for handoff.
    
    3) The task transitions to COMPLETED immediately before exiting, when
       truly done.
---
 docs/ingestion/tasks.md                            |  1 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 34 ++++++++++++++++++++--
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  9 ++++++
 .../SeekableStreamIndexTaskRunner.java             |  3 +-
 4 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 3291abb01a0..743a46cc854 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task 
reached. Possible sta
 - `NOT_STARTED`: The task has not begun reading any rows
 - `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning
 - `BUILD_SEGMENTS`: The task is processing rows to construct segments
+- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is 
waiting for them to become available.
 - `COMPLETED`: The task has finished its work.
 
 Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as 
those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS 
phase.
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15b77be307d..f8c6b23aae9 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -52,6 +52,7 @@ import org.apache.druid.data.input.kafka.KafkaRecordEntity;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
 import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
 import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
+import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@@ -1617,6 +1618,10 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
+    Assert.assertNull(reportData.getErrorMsg());
+
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1697,6 +1702,10 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.BUILD_SEGMENTS, 
reportData.getIngestionState());
+    Assert.assertNotNull(reportData.getErrorMsg());
+
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -3057,9 +3066,13 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         newDataSchemaMetadata()
     );
 
-    // Verify unparseable data
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
+    Assert.assertNull(reportData.getErrorMsg());
+
+    // Verify unparseable data
     ParseExceptionReport parseExceptionReport =
         ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
 
@@ -3190,9 +3203,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
     Assert.assertNull(newDataSchemaMetadata());
 
+    // Verify ingestion state and error message
+    final IngestionStatsAndErrors reportData = getTaskReportData();
+    Assert.assertEquals(IngestionState.BUILD_SEGMENTS, 
reportData.getIngestionState());
+    Assert.assertNotNull(reportData.getErrorMsg());
+
     // Verify there is no unparseable data in the report since we've 0 saved 
parse exceptions
     ParseExceptionReport parseExceptionReport =
-        ParseExceptionReport.forPhase(getTaskReportData(), 
RowIngestionMeters.BUILD_SEGMENTS);
+        ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
 
     Assert.assertEquals(ImmutableList.of(), 
parseExceptionReport.getErrorMessages());
   }
@@ -3231,6 +3249,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
     IngestionStatsAndErrors reportData = getTaskReportData();
+
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
+    Assert.assertNull(reportData.getErrorMsg());
+
+    // Verify report metrics
     Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
     
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(),
 (Long) 6L);
   }
@@ -3279,6 +3303,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
     IngestionStatsAndErrors reportData = getTaskReportData();
+
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
+    Assert.assertNull(reportData.getErrorMsg());
+
+    // Verify report metrics
     Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
     
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L,
 2L)));
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 80bded2031d..510eaa797e0 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
+import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
@@ -1186,6 +1187,10 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
+    Assert.assertNull(reportData.getErrorMsg());
+
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1272,6 +1277,10 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Verify ingestion state and error message
+    Assert.assertEquals(IngestionState.BUILD_SEGMENTS, 
reportData.getIngestionState());
+    Assert.assertNotNull(reportData.getErrorMsg());
+
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index d347fd81503..df8e220145b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -768,7 +768,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             }
           }
         }
-        ingestionState = IngestionState.COMPLETED;
       }
       catch (Exception e) {
         // (1) catch all exceptions while reading from kafka
@@ -835,6 +834,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       // failed to persist sequences. It might also return null if handoff 
failed, but was recoverable.
       // See publishAndRegisterHandoff() for details.
       List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
+      ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
       if (tuningConfig.getHandoffConditionTimeout() == 0) {
         handedOffList = Futures.allAsList(handOffWaitList).get();
       } else {
@@ -928,6 +928,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
     }
 
+    ingestionState = IngestionState.COMPLETED;
     toolbox.getTaskReportFileWriter().write(task.getId(), 
getTaskCompletionReports(null, handoffWaitMs));
     return TaskStatus.success(task.getId());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to