This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c4b5ea6aed6 Add drop reason dimension to ingest/events/thrownAway
metric (#18855)
c4b5ea6aed6 is described below
commit c4b5ea6aed6e03a2ab953e04ea56483df53637a2
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Dec 23 12:01:55 2025 -0500
Add drop reason dimension to ingest/events/thrownAway metric (#18855)
This adds:
- Better logging in task logs indicating the breakdown of thrown away
events by reason.
- A `reason` dimension to the `ingest/events/thrownAway` metric for
aggregating on thrown away reason.
- A `thrownAwayByReason` map to row statistics task API response payload,
so future consumers can make use of it should they need to.
- Better interface for row filters, making it easy to adding more filtering
reasons in the future.
---
docs/operations/metrics.md | 2 +-
.../main/resources/defaultMetricDimensions.json | 3 +-
.../src/main/resources/defaultMetrics.json | 3 +-
.../src/main/resources/defaultMetrics.json | 2 +-
.../main/resources/defaultMetricDimensions.json | 2 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 31 ++--
.../indexing/kinesis/KinesisIndexTaskTest.java | 17 +-
.../common/stats/DropwizardRowIngestionMeters.java | 39 +++--
.../common/stats/TaskRealtimeMetricsMonitor.java | 33 +++-
.../common/task/AbstractBatchIndexTask.java | 2 +-
.../task/FilteringCloseableInputRowIterator.java | 17 +-
.../druid/indexing/common/task/InputRowFilter.java | 73 +++++++++
.../parallel/ParallelIndexSupervisorTask.java | 3 +
.../SeekableStreamIndexTaskRunner.java | 30 ++--
.../indexing/seekablestream/StreamChunkParser.java | 11 +-
.../common/TaskRealtimeMetricsMonitorTest.java | 171 ++++++++++++++++-----
.../stats/DropwizardRowIngestionMetersTest.java | 104 +++++++++++++
.../FilteringCloseableInputRowIteratorTest.java | 113 +++++++++++++-
.../druid/indexing/common/task/IndexTaskTest.java | 31 +++-
.../indexing/common/task/InputRowFilterTest.java | 122 +++++++++++++++
.../parallel/SinglePhaseParallelIndexingTest.java | 7 +-
.../SeekableStreamIndexTaskRunnerTest.java | 58 ++++++-
.../seekablestream/StreamChunkParserTest.java | 21 +--
.../java/org/apache/druid/query/DruidMetrics.java | 1 +
.../segment/incremental/InputRowFilterResult.java | 101 ++++++++++++
.../incremental/NoopRowIngestionMeters.java | 12 +-
.../segment/incremental/RowIngestionMeters.java | 13 +-
.../incremental/RowIngestionMetersTotals.java | 66 +++++++-
.../incremental/SimpleRowIngestionMeters.java | 34 +++-
.../druid/indexer/report/TaskReportSerdeTest.java | 3 +
.../incremental/InputRowFilterResultTest.java | 64 ++++++++
.../druid/segment/incremental/RowMeters.java | 15 +-
.../incremental/SimpleRowIngestionMetersTest.java | 43 +++++-
33 files changed, 1095 insertions(+), 152 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index aa1c25a693a..e2c957b9cee 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/events/processed`|Number of events processed per emission
period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the
number of events per emission period.|
|`ingest/events/processedWithError`|Number of events processed with some
partial errors per emission period. Events processed with partial errors are
counted towards both this metric and `ingest/events/processed`.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
-|`ingest/events/thrownAway`|Number of events rejected because they are null,
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`,
`earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`,
`tags`|0|
+|`ingest/events/thrownAway`|Number of events rejected because they are null,
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`,
`earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event
was thrown away.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`,
`reason`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are
duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/input/bytes`|Number of bytes read from input sources, after
decompression but prior to parsing. This covers all data read, including data
that does not end up being fully processed and ingested. For example, this
includes data that ends up being rejected for being unparseable or filtered
out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
amount of data read.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`,
`taskType`, `groupId`|Your number of events with rollup.|
diff --git
a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
index 6b8d9a9887d..bc0c451870f 100644
---
a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
@@ -126,7 +126,8 @@
},
"ingest/events/thrownAway": {
"dimensions": [
- "dataSource"
+ "dataSource",
+ "reason"
],
"type": "counter"
},
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index 311a06e6e4c..69100697a62 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -54,7 +54,8 @@
"query/cache/total/timeouts": [],
"query/cache/total/errors": [],
"ingest/events/thrownAway": [
- "dataSource"
+ "dataSource",
+ "reason"
],
"ingest/events/unparseable": [
"dataSource"
diff --git
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 82ced1e9abb..3702039c283 100644
---
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -104,7 +104,7 @@
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" :
"count", "help": "Number of events successfully processed per emission period."
},
"ingest/events/processedWithError" : { "dimensions" : ["dataSource"], "type"
: "count", "help": "Number of events processed with some partial errors per
emission period." },
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" :
"count", "help": "Number of events rejected because the events are
unparseable." },
- "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" :
"count", "help": "Number of events rejected because they are outside the
windowPeriod."},
+ "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"],
"type" : "count", "help": "Number of events rejected because they are null,
filtered by transformSpec, or outside the message rejection periods. The
`reason` dimension indicates why the event was thrown away."},
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" :
"count", "help": "Number of events rejected because the events are
duplicated."},
"ingest/input/bytes" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of bytes read from input sources, after decompression but prior
to parsing." },
"ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of Druid rows persisted."},
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 0e3862f3da5..e6b7f9f594a 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -38,7 +38,7 @@
"query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
"query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },
- "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" :
"count" },
+ "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"],
"type" : "count" },
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" :
"count" },
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" :
"count" },
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" :
"count" },
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 96dd04d42cf..cb359896ee2 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -113,6 +113,7 @@ import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
@@ -717,7 +718,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13,
15);
- verifyTaskMetrics(task,
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
+ verifyTaskMetrics(task,
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(8));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -851,7 +852,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13,
15);
- verifyTaskMetrics(task,
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
+ verifyTaskMetrics(task,
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(8));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -1165,7 +1166,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
- verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAway(2).totalProcessed(3));
+ verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME,
2).totalProcessed(3));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -1214,7 +1215,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
- verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAway(2).totalProcessed(3));
+ verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME,
2).totalProcessed(3));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -1272,7 +1273,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
- verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAway(4).totalProcessed(1));
+ verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
4).totalProcessed(1));
// Check published metadata
final List<SegmentDescriptor> publishedDescriptors =
publishedDescriptors();
@@ -1642,7 +1643,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyTaskMetrics(task, RowMeters.with()
.bytes(totalRecordBytes)
.unparseable(3).errors(3)
- .thrownAway(1).totalProcessed(4));
+
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(4));
// Check published metadata
assertEqualsExceptVersion(
@@ -1660,6 +1661,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedThrownAwayByReason =
Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1);
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1667,7 +1670,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
RowIngestionMeters.PROCESSED_BYTES, (int) totalRecordBytes,
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 1
+ RowIngestionMeters.THROWN_AWAY, 1,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1745,6 +1749,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(IngestionState.BUILD_SEGMENTS,
reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedThrownAwayByReason = Map.of();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1752,7 +1758,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1887,7 +1894,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2,
5)).totalProcessed(3));
verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3,
10))
-
.unparseable(3).thrownAway(1).totalProcessed(3));
+
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(3));
// Check published segments & metadata, should all be from the first task
final List<SegmentDescriptor> publishedDescriptors =
publishedDescriptors();
@@ -1961,7 +1968,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2,
5)).totalProcessed(3));
verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3,
10))
-
.unparseable(3).thrownAway(1).totalProcessed(3));
+
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(3));
// Check published segments & metadata
SegmentDescriptorAndExpectedDim1Values desc3 = sdd("2011/P1D", 1,
ImmutableList.of("d", "e"));
@@ -2576,7 +2583,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
long totalBytes = getTotalSizeOfRecords(0, 2) + getTotalSizeOfRecords(5,
11);
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes)
-
.unparseable(3).errors(1).thrownAway(1).totalProcessed(3));
+
.unparseable(3).errors(1).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
1).totalProcessed(3));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -3437,7 +3444,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
- verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAway(4).totalProcessed(1));
+ verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0,
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
4).totalProcessed(1));
// Check published metadata
final List<SegmentDescriptor> publishedDescriptors =
publishedDescriptors();
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 98dfea4333c..bd2a8aa2712 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -82,6 +82,7 @@ import
org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowMeters;
import org.apache.druid.segment.indexing.DataSchema;
@@ -801,7 +802,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyAll();
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
- .thrownAway(2).totalProcessed(3));
+
.thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME,
2).totalProcessed(3));
// Check published metadata
assertEqualsExceptVersion(
@@ -864,7 +865,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyAll();
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
- .thrownAway(2).totalProcessed(3));
+
.thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME,
2).totalProcessed(3));
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -923,7 +924,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
verifyAll();
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
- .thrownAway(4).totalProcessed(1));
+
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
4).totalProcessed(1));
// Check published metadata
assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)),
publishedDescriptors());
@@ -1194,6 +1195,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(IngestionState.COMPLETED,
reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedThrownAwayByReason = Map.of();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1201,7 +1204,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
RowIngestionMeters.PROCESSED_BYTES, 763,
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.UNPARSEABLE, 4,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1284,6 +1288,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(IngestionState.BUILD_SEGMENTS,
reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedThrownAwayByReason = Map.of();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1291,7 +1297,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
index e9fe0683f22..49a19b23799 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
@@ -21,9 +21,11 @@ package org.apache.druid.indexing.common.stats;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -37,7 +39,7 @@ public class DropwizardRowIngestionMeters implements
RowIngestionMeters
private final Meter processedBytes;
private final Meter processedWithError;
private final Meter unparseable;
- private final Meter thrownAway;
+ private final Meter[] thrownAwayByReason = new
Meter[InputRowFilterResult.numValues()];
public DropwizardRowIngestionMeters()
{
@@ -46,7 +48,9 @@ public class DropwizardRowIngestionMeters implements
RowIngestionMeters
this.processedBytes = metricRegistry.meter(PROCESSED_BYTES);
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
this.unparseable = metricRegistry.meter(UNPARSEABLE);
- this.thrownAway = metricRegistry.meter(THROWN_AWAY);
+ for (InputRowFilterResult reason : InputRowFilterResult.values()) {
+ this.thrownAwayByReason[reason.ordinal()] =
metricRegistry.meter(THROWN_AWAY + "_" + reason.name());
+ }
}
@Override
@@ -100,13 +104,30 @@ public class DropwizardRowIngestionMeters implements
RowIngestionMeters
@Override
public long getThrownAway()
{
- return thrownAway.getCount();
+ long totalThrownAway = 0;
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ totalThrownAway += thrownAwayByReason[reason.ordinal()].getCount();
+ }
+ return totalThrownAway;
}
@Override
- public void incrementThrownAway()
+ public void incrementThrownAway(InputRowFilterResult reason)
{
- thrownAway.mark();
+ thrownAwayByReason[reason.ordinal()].mark();
+ }
+
+ @Override
+ public Map<String, Long> getThrownAwayByReason()
+ {
+ Map<String, Long> result = new HashMap<>();
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ long count = thrownAwayByReason[reason.ordinal()].getCount();
+ if (count > 0) {
+ result.put(reason.getReason(), count);
+ }
+ }
+ return result;
}
@Override
@@ -116,7 +137,7 @@ public class DropwizardRowIngestionMeters implements
RowIngestionMeters
processed.getCount(),
processedBytes.getCount(),
processedWithError.getCount(),
- thrownAway.getCount(),
+ getThrownAwayByReason(),
unparseable.getCount()
);
}
@@ -131,21 +152,21 @@ public class DropwizardRowIngestionMeters implements
RowIngestionMeters
oneMinute.put(PROCESSED_BYTES, processedBytes.getOneMinuteRate());
oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate());
oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate());
- oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate());
+ oneMinute.put(THROWN_AWAY,
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getOneMinuteRate).sum());
Map<String, Object> fiveMinute = new HashMap<>();
fiveMinute.put(PROCESSED, processed.getFiveMinuteRate());
fiveMinute.put(PROCESSED_BYTES, processedBytes.getFiveMinuteRate());
fiveMinute.put(PROCESSED_WITH_ERROR,
processedWithError.getFiveMinuteRate());
fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate());
- fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate());
+ fiveMinute.put(THROWN_AWAY,
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFiveMinuteRate).sum());
Map<String, Object> fifteenMinute = new HashMap<>();
fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate());
fifteenMinute.put(PROCESSED_BYTES, processedBytes.getFifteenMinuteRate());
fifteenMinute.put(PROCESSED_WITH_ERROR,
processedWithError.getFifteenMinuteRate());
fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate());
- fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate());
+ fifteenMinute.put(THROWN_AWAY,
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFifteenMinuteRate).sum());
movingAverages.put(ONE_MINUTE_NAME, oneMinute);
movingAverages.put(FIVE_MINUTE_NAME, fiveMinute);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 4895776a796..462fc234812 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -23,10 +23,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Emits metrics from {@link SegmentGenerationMetrics} and {@link
RowIngestionMeters}.
*/
@@ -51,7 +56,7 @@ public class TaskRealtimeMetricsMonitor extends
AbstractMonitor
this.rowIngestionMeters = rowIngestionMeters;
this.builder = metricEventBuilder;
previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
- previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0,
0, 0);
+ previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0,
Map.of(), 0);
}
@Override
@@ -60,14 +65,30 @@ public class TaskRealtimeMetricsMonitor extends
AbstractMonitor
SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot();
RowIngestionMetersTotals rowIngestionMetersTotals =
rowIngestionMeters.getTotals();
- final long thrownAway = rowIngestionMetersTotals.getThrownAway() -
previousRowIngestionMetersTotals.getThrownAway();
- if (thrownAway > 0) {
+ // Emit per-reason metrics with the reason dimension
+ final Map<String, Long> currentThrownAwayByReason =
rowIngestionMetersTotals.getThrownAwayByReason();
+ final Map<String, Long> previousThrownAwayByReason =
previousRowIngestionMetersTotals.getThrownAwayByReason();
+ final Map<String, Long> deltaThrownAwayByReason = new HashMap<>();
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ final long currentCount =
currentThrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+ final long previousCount =
previousThrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+ final long delta = currentCount - previousCount;
+ if (delta > 0) {
+ deltaThrownAwayByReason.put(reason.getReason(), delta);
+ emitter.emit(
+ builder.setDimension(DruidMetrics.REASON, reason.getReason())
+ .setMetric("ingest/events/thrownAway", delta)
+ );
+ }
+ }
+ final long totalThrownAway =
deltaThrownAwayByReason.values().stream().reduce(0L, Long::sum);
+ if (totalThrownAway > 0) {
log.warn(
- "[%,d] events thrown away. Possible causes: null events, events
filtered out by transformSpec, or events outside earlyMessageRejectionPeriod /
lateMessageRejectionPeriod.",
- thrownAway
+ "[%,d] events thrown away. Breakdown: [%s]",
+ totalThrownAway,
+ deltaThrownAwayByReason
);
}
- emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway));
final long unparseable = rowIngestionMetersTotals.getUnparseable()
-
previousRowIngestionMetersTotals.getUnparseable();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 97dc70f3357..ccdc99dcd05 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -230,7 +230,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
);
return new FilteringCloseableInputRowIterator(
inputSourceReader.read(ingestionMeters),
- rowFilter,
+ InputRowFilter.fromPredicate(rowFilter),
ingestionMeters,
parseExceptionHandler
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
index 30af8febaae..074ae6d9f09 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -22,22 +22,22 @@ package org.apache.druid.indexing.common.task;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import java.io.IOException;
import java.util.NoSuchElementException;
-import java.util.function.Predicate;
/**
* An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter
out rows which do not satisfy the given
- * {@link #filter} or throw {@link ParseException} while parsing them. The
relevant metric should be counted whenever
+ * {@link InputRowFilter} or throw {@link ParseException} while parsing them.
The relevant metric should be counted whenever
* it filters out rows based on the filter. ParseException handling is
delegatged to {@link ParseExceptionHandler}.
*/
public class FilteringCloseableInputRowIterator implements
CloseableIterator<InputRow>
{
private final CloseableIterator<InputRow> delegate;
- private final Predicate<InputRow> filter;
+ private final InputRowFilter rowFilter;
private final RowIngestionMeters rowIngestionMeters;
private final ParseExceptionHandler parseExceptionHandler;
@@ -45,13 +45,13 @@ public class FilteringCloseableInputRowIterator implements
CloseableIterator<Inp
public FilteringCloseableInputRowIterator(
CloseableIterator<InputRow> delegate,
- Predicate<InputRow> filter,
+ InputRowFilter rowFilter,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
)
{
this.delegate = delegate;
- this.filter = filter;
+ this.rowFilter = rowFilter;
this.rowIngestionMeters = rowIngestionMeters;
this.parseExceptionHandler = parseExceptionHandler;
}
@@ -66,11 +66,12 @@ public class FilteringCloseableInputRowIterator implements
CloseableIterator<Inp
while (next == null && delegate.hasNext()) {
// delegate.next() can throw ParseException
final InputRow row = delegate.next();
- // filter.test() can throw ParseException
- if (filter.test(row)) {
+ // rowFilter.test() can throw ParseException
+ final InputRowFilterResult filterResult = rowFilter.test(row);
+ if (!filterResult.isRejected()) {
next = row;
} else {
- rowIngestionMeters.incrementThrownAway();
+ rowIngestionMeters.incrementThrownAway(filterResult);
}
}
break;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
new file mode 100644
index 00000000000..8f65b9bcbe9
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+
+import java.util.function.Predicate;
+
+/**
+ * A filter for input rows during ingestion that returns a {@link
InputRowFilterResult} per row.
+ * This is similar to {@link Predicate} but returns an {@link
InputRowFilterResult} instead of a boolean.
+ */
+@FunctionalInterface
+public interface InputRowFilter
+{
+ /**
+ * Tests whether the given row should be accepted.
+ *
+ * @return {@link InputRowFilterResult#ACCEPTED} only if the row should be
accepted, otherwise another {@link InputRowFilterResult} value.
+ */
+ InputRowFilterResult test(InputRow row);
+
+ /**
+ * Creates a {@link InputRowFilter} from a {@link Predicate}.
+ * Callers wishing to return custom rejection reason logic should implement
their own {@link InputRowFilter} directly.
+ */
+ static InputRowFilter fromPredicate(Predicate<InputRow> predicate)
+ {
+ return row -> predicate.test(row) ? InputRowFilterResult.ACCEPTED :
InputRowFilterResult.CUSTOM_FILTER;
+ }
+
+ /**
+ * Fully-permissive {@link InputRowFilter} used mainly for tests.
+ */
+ static InputRowFilter allowAll()
+ {
+ return row -> InputRowFilterResult.ACCEPTED;
+ }
+
+ /**
+ * Combines this filter with another filter. A row is rejected if either
filter rejects it.
+ * The rejection reason from the first rejecting filter (this filter first)
is returned.
+ */
+ default InputRowFilter and(InputRowFilter other)
+ {
+ return row -> {
+ InputRowFilterResult result = this.test(row);
+ if (result.isRejected()) {
+ return result;
+ }
+ return other.test(row);
+ };
+ }
+}
+
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 9faa4c2013e..74678086d59 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1602,11 +1602,14 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
return (RowIngestionMetersTotals) buildSegmentsRowStats;
} else if (buildSegmentsRowStats instanceof Map) {
Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>)
buildSegmentsRowStats;
+ Map<String, Integer> thrownAwayByReason = (Map)
buildSegmentsRowStatsMap.get("thrownAwayByReason");
return new RowIngestionMetersTotals(
((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
((Number)
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
((Number)
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than
Longs
+ thrownAwayByReason != null ?
CollectionUtils.mapValues(thrownAwayByReason, Integer::longValue) : null,
((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
);
} else {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 16505f761bb..a448e01f918 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -419,7 +420,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
inputRowSchema,
task.getDataSchema().getTransformSpec(),
toolbox.getIndexingTmpDir(),
- row -> row != null && withinMinMaxRecordTime(row),
+ this::ensureRowIsNonNullAndWithinMessageTimeBounds,
rowIngestionMeters,
parseExceptionHandler
);
@@ -2144,26 +2145,33 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
);
}
- public boolean withinMinMaxRecordTime(final InputRow row)
+ /**
+ * Returns {@link InputRowFilterResult#ACCEPTED} if the row should be
accepted,
+ * or a rejection reason otherwise.
+ */
+ InputRowFilterResult ensureRowIsNonNullAndWithinMessageTimeBounds(@Nullable
InputRow row)
{
- final boolean beforeMinimumMessageTime =
minMessageTime.isAfter(row.getTimestamp());
- final boolean afterMaximumMessageTime =
maxMessageTime.isBefore(row.getTimestamp());
-
- if (log.isDebugEnabled()) {
- if (beforeMinimumMessageTime) {
+ if (row == null) {
+ return InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+ } else if (minMessageTime.isAfter(row.getTimestamp())) {
+ if (log.isDebugEnabled()) {
log.debug(
- "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
+ "CurrentTimeStamp[%s] is before minimumMessageTime[%s]",
row.getTimestamp(),
minMessageTime
);
- } else if (afterMaximumMessageTime) {
+ }
+ return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+ } else if (maxMessageTime.isBefore(row.getTimestamp())) {
+ if (log.isDebugEnabled()) {
log.debug(
- "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
+ "CurrentTimeStamp[%s] is after maximumMessageTime[%s]",
row.getTimestamp(),
maxMessageTime
);
}
+ return InputRowFilterResult.AFTER_MAX_MESSAGE_TIME;
}
- return !beforeMinimumMessageTime && !afterMaximumMessageTime;
+ return InputRowFilterResult.ACCEPTED;
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 3ac952c16c2..ad9886e424a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -28,9 +28,11 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import
org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
+import org.apache.druid.indexing.common.task.InputRowFilter;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.transform.TransformSpec;
@@ -42,7 +44,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.function.Predicate;
/**
* Abstraction for parsing stream data which internally uses {@link
org.apache.druid.data.input.InputEntityReader}
@@ -54,7 +55,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
private final InputRowParser<ByteBuffer> parser;
@Nullable
private final SettableByteEntityReader<RecordType> byteEntityReader;
- private final Predicate<InputRow> rowFilter;
+ private final InputRowFilter rowFilter;
private final RowIngestionMeters rowIngestionMeters;
private final ParseExceptionHandler parseExceptionHandler;
@@ -67,7 +68,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
InputRowSchema inputRowSchema,
TransformSpec transformSpec,
File indexingTmpDir,
- Predicate<InputRow> rowFilter,
+ InputRowFilter rowFilter,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
)
@@ -96,7 +97,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
StreamChunkParser(
@Nullable InputRowParser<ByteBuffer> parser,
@Nullable SettableByteEntityReader<RecordType> byteEntityReader,
- Predicate<InputRow> rowFilter,
+ InputRowFilter rowFilter,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
)
@@ -117,7 +118,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
if (!isEndOfShard) {
// We do not count end of shard record as thrown away event since this
is a record created by Druid
// Note that this only applies to Kinesis
- rowIngestionMeters.incrementThrownAway();
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
}
return Collections.emptyList();
} else {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
index 252d265d67d..bcfebef2a6c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -21,28 +21,22 @@ package org.apache.druid.indexing.common;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.MonitorUtils;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-@RunWith(MockitoJUnitRunner.class)
public class TaskRealtimeMetricsMonitorTest
{
private static final Map<String, String[]> DIMENSIONS = ImmutableMap.of(
@@ -53,28 +47,18 @@ public class TaskRealtimeMetricsMonitorTest
);
private static final Map<String, Object> TAGS = ImmutableMap.of("author",
"Author Name", "version", 10);
- private SegmentGenerationMetrics segmentGenerationMetrics;
- @Mock(answer = Answers.RETURNS_MOCKS)
+ private SegmentGenerationMetrics segmentGenerationMetrics;
private RowIngestionMeters rowIngestionMeters;
- @Mock
- private ServiceEmitter emitter;
- private Map<String, ServiceMetricEvent> emittedEvents;
+ private StubServiceEmitter emitter;
private TaskRealtimeMetricsMonitor target;
@Before
public void setUp()
{
- emittedEvents = new HashMap<>();
segmentGenerationMetrics = new SegmentGenerationMetrics();
-
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
- Mockito
- .doAnswer(invocation -> {
- ServiceMetricEvent e = invocation.getArgument(0);
- emittedEvents.put(e.getMetric(), e);
- return null;
- })
- .when(emitter).emit(ArgumentMatchers.any(Event.class));
+ rowIngestionMeters = new SimpleRowIngestionMeters();
+ emitter = new StubServiceEmitter();
target = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
rowIngestionMeters,
@@ -86,7 +70,10 @@ public class TaskRealtimeMetricsMonitorTest
public void testdoMonitorShouldEmitUserProvidedTags()
{
target.doMonitor(emitter);
- for (ServiceMetricEvent sme : emittedEvents.values()) {
+
+ List<ServiceMetricEvent> events =
emitter.getMetricEvents("ingest/events/unparseable");
+ Assert.assertFalse(events.isEmpty());
+ for (ServiceMetricEvent sme : events) {
Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS));
}
}
@@ -94,12 +81,19 @@ public class TaskRealtimeMetricsMonitorTest
@Test
public void testdoMonitorWithoutTagsShouldNotEmitTags()
{
+ ServiceMetricEvent.Builder builderWithoutTags = new
ServiceMetricEvent.Builder();
+ MonitorUtils.addDimensionsToBuilder(builderWithoutTags, DIMENSIONS);
+
target = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
rowIngestionMeters,
- createMetricEventBuilder()
+ builderWithoutTags
);
- for (ServiceMetricEvent sme : emittedEvents.values()) {
+ target.doMonitor(emitter);
+
+ List<ServiceMetricEvent> events =
emitter.getMetricEvents("ingest/events/unparseable");
+ Assert.assertFalse(events.isEmpty());
+ for (ServiceMetricEvent sme : events) {
Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
}
}
@@ -107,24 +101,123 @@ public class TaskRealtimeMetricsMonitorTest
@Test
public void testMessageGapAggStats()
{
- target = new TaskRealtimeMetricsMonitor(
+ target.doMonitor(emitter);
+
Assert.assertTrue(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty());
+
Assert.assertTrue(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty());
+
Assert.assertTrue(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty());
+
+ emitter.flush();
+ segmentGenerationMetrics.reportMessageGap(1);
+ target.doMonitor(emitter);
+
+
Assert.assertFalse(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty());
+
Assert.assertFalse(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty());
+
Assert.assertFalse(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty());
+ }
+
+ @Test
+ public void testThrownAwayEmitsReasonDimension()
+ {
+ SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+
realMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+ TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
- rowIngestionMeters,
+ realMeters,
createMetricEventBuilder()
);
- target.doMonitor(emitter);
-
Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap"));
-
Assert.assertFalse(emittedEvents.containsKey("ingest/events/maxMessageGap"));
-
Assert.assertFalse(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+ monitor.doMonitor(emitter);
- emittedEvents.clear();
- segmentGenerationMetrics.reportMessageGap(1);
- target.doMonitor(emitter);
+ Map<String, Long> thrownAwayByReason = new HashMap<>();
+ for (ServiceMetricEvent event :
emitter.getMetricEvents("ingest/events/thrownAway")) {
+ Object reason = event.getUserDims().get("reason");
+ thrownAwayByReason.put(reason.toString(), event.getValue().longValue());
+ }
+
+ Assert.assertEquals(Long.valueOf(2), thrownAwayByReason.get("null"));
+ Assert.assertEquals(Long.valueOf(3),
thrownAwayByReason.get("beforeMinimumMessageTime"));
+ Assert.assertEquals(Long.valueOf(1),
thrownAwayByReason.get("afterMaximumMessageTime"));
+ Assert.assertEquals(Long.valueOf(4), thrownAwayByReason.get("filtered"));
+ }
+
+ @Test
+ public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero()
+ {
+ SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+ TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
+ segmentGenerationMetrics,
+ realMeters,
+ createMetricEventBuilder()
+ );
+
+ monitor.doMonitor(emitter);
+
+ Map<String, Long> thrownAwayByReason = new HashMap<>();
+ for (ServiceMetricEvent event :
emitter.getMetricEvents("ingest/events/thrownAway")) {
+ Object reason = event.getUserDims().get("reason");
+ thrownAwayByReason.put(reason.toString(), event.getValue().longValue());
+ }
+
+ // Only reasons with non-zero counts should be emitted
+ Assert.assertEquals(2, thrownAwayByReason.size());
+ Assert.assertTrue(thrownAwayByReason.containsKey("null"));
+ Assert.assertTrue(thrownAwayByReason.containsKey("filtered"));
+
Assert.assertFalse(thrownAwayByReason.containsKey("beforeMinimumMessageTime"));
+
Assert.assertFalse(thrownAwayByReason.containsKey("afterMaximumMessageTime"));
+ }
+
+ @Test
+ public void testThrownAwayReasonDeltaAcrossMonitorCalls()
+ {
+ SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+
+ TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
+ segmentGenerationMetrics,
+ realMeters,
+ createMetricEventBuilder()
+ );
+
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ monitor.doMonitor(emitter);
+
+ long firstCallNullCount = 0;
+ for (ServiceMetricEvent event :
emitter.getMetricEvents("ingest/events/thrownAway")) {
+ if ("null".equals(event.getUserDims().get("reason"))) {
+ firstCallNullCount = event.getValue().longValue();
+ }
+ }
+ Assert.assertEquals(2, firstCallNullCount);
+
+ emitter.flush();
+ realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ monitor.doMonitor(emitter);
+
+ // Find counts from second call - should be deltas only
+ Map<String, Long> secondCallCounts = new HashMap<>();
+ for (ServiceMetricEvent event :
emitter.getMetricEvents("ingest/events/thrownAway")) {
+ Object reason = event.getUserDims().get("reason");
+ secondCallCounts.put(reason.toString(), event.getValue().longValue());
+ }
-
Assert.assertTrue(emittedEvents.containsKey("ingest/events/minMessageGap"));
-
Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap"));
-
Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+ // Should emit only the delta (1 more NULL, 2 new FILTERED)
+ Assert.assertEquals(Long.valueOf(1), secondCallCounts.get("null"));
+ Assert.assertEquals(Long.valueOf(2), secondCallCounts.get("filtered"));
}
private ServiceMetricEvent.Builder createMetricEventBuilder()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
new file mode 100644
index 00000000000..d9227670000
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.stats;
+
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class DropwizardRowIngestionMetersTest
+{
+ @Test
+ public void testBasicIncrements()
+ {
+ DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+ meters.incrementProcessed();
+ meters.incrementProcessedBytes(100);
+ meters.incrementProcessedWithError();
+ meters.incrementUnparseable();
+ meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+
+ Assert.assertEquals(1, meters.getProcessed());
+ Assert.assertEquals(100, meters.getProcessedBytes());
+ Assert.assertEquals(1, meters.getProcessedWithError());
+ Assert.assertEquals(1, meters.getUnparseable());
+ Assert.assertEquals(1, meters.getThrownAway());
+
+ RowIngestionMetersTotals totals = meters.getTotals();
+ Assert.assertEquals(1, totals.getProcessed());
+ Assert.assertEquals(100, totals.getProcessedBytes());
+ Assert.assertEquals(1, totals.getProcessedWithError());
+ Assert.assertEquals(1, totals.getUnparseable());
+ Assert.assertEquals(1, totals.getThrownAway());
+ }
+
+ @Test
+ public void testIncrementThrownAwayWithReason()
+ {
+ DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+ meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ meters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+ meters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+ meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+ // Total thrownAway should be sum of all reasons
+ Assert.assertEquals(7, meters.getThrownAway());
+
+ // Check per-reason counts
+ Map<String, Long> byReason = meters.getThrownAwayByReason();
+ Assert.assertEquals(Long.valueOf(2),
byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
+ Assert.assertEquals(Long.valueOf(3),
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+ }
+
+ @Test
+ public void testGetThrownAwayByReasonReturnsAllReasons()
+ {
+ DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+ // With no increments, all reasons should be present with 0 counts
+ Map<String, Long> byReason = meters.getThrownAwayByReason();
+ Assert.assertTrue(byReason.isEmpty());
+ }
+
+ @Test
+ public void testMovingAverages()
+ {
+ DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+ meters.incrementProcessed();
+ meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+ Map<String, Object> movingAverages = meters.getMovingAverages();
+ Assert.assertNotNull(movingAverages);
+
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.ONE_MINUTE_NAME));
+
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME));
+
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME));
+ }
+}
+
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
index 2b4ffe26f31..e037d9adf72 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -78,7 +80,7 @@ public class FilteringCloseableInputRowIteratorTest
final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") ==
10;
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
CloseableIterators.withEmptyBaggage(ROWS.iterator()),
- filter,
+ InputRowFilter.fromPredicate(filter),
rowIngestionMeters,
parseExceptionHandler
);
@@ -125,7 +127,7 @@ public class FilteringCloseableInputRowIteratorTest
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -163,7 +165,7 @@ public class FilteringCloseableInputRowIteratorTest
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
- filter,
+ InputRowFilter.fromPredicate(filter),
rowIngestionMeters,
parseExceptionHandler
);
@@ -214,7 +216,7 @@ public class FilteringCloseableInputRowIteratorTest
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -260,7 +262,7 @@ public class FilteringCloseableInputRowIteratorTest
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -281,7 +283,7 @@ public class FilteringCloseableInputRowIteratorTest
);
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
delegate,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -330,7 +332,7 @@ public class FilteringCloseableInputRowIteratorTest
final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -350,6 +352,103 @@ public class FilteringCloseableInputRowIteratorTest
}
+ @Test
+ public void testRowFilterWithReasons()
+ {
+ // RowFilter that returns different reasons based on dim1 value
+ final InputRowFilter rowFilter = row -> {
+ int dim1 = (Integer) row.getRaw("dim1");
+ if (dim1 == 10) {
+ return InputRowFilterResult.ACCEPTED;
+ } else if (dim1 == 20) {
+ return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+ } else {
+ return InputRowFilterResult.CUSTOM_FILTER;
+ }
+ };
+
+ final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
+ CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+ rowFilter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+
+ // Only rows with dim1=10 should pass
+ Assert.assertEquals(4, filteredRows.size());
+ for (InputRow row : filteredRows) {
+ Assert.assertEquals(10, row.getRaw("dim1"));
+ }
+
+ // Check total thrown away
+ Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+ // Check per-reason counts
+ Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+ Assert.assertEquals(2, byReason.size());
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); //
dim1=20
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); // dim1=30
+ }
+
+ @Test
+ public void testRowFilterFromPredicate()
+ {
+ // Use the static helper to convert a Predicate to RowFilter
+ final Predicate<InputRow> predicate = row -> (Integer) row.getRaw("dim1")
== 10;
+ final InputRowFilter rowFilter = InputRowFilter.fromPredicate(predicate);
+
+ final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
+ CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+ rowFilter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+
+ Assert.assertEquals(4, filteredRows.size());
+ Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+ // All thrown away should have FILTERED reason when using fromPredicate
+ Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+ Assert.assertEquals(1, byReason.size());
+ Assert.assertEquals(Long.valueOf(2),
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+ }
+
+ @Test
+ public void testRowFilterAnd()
+ {
+ // First filter: reject nulls (simulated by checking dim1)
+ final InputRowFilter nullFilter = row -> row == null ?
InputRowFilterResult.NULL_OR_EMPTY_RECORD : InputRowFilterResult.ACCEPTED;
+
+ // Second filter: reject if dim1 != 10
+ final InputRowFilter valueFilter = row -> (Integer) row.getRaw("dim1") ==
10 ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER;
+
+ // Combine filters
+ final InputRowFilter combinedFilter = nullFilter.and(valueFilter);
+
+ final FilteringCloseableInputRowIterator rowIterator = new
FilteringCloseableInputRowIterator(
+ CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+ combinedFilter,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+
+ Assert.assertEquals(4, filteredRows.size());
+ Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+ // All rejected rows should have FILTERED reason (from second filter)
+ Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+ Assert.assertEquals(Long.valueOf(2),
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+ }
+
private static InputRow newRow(DateTime timestamp, Object dim1Val, Object
dim2Val)
{
return new MapBasedInputRow(timestamp, DIMENSIONS, ImmutableMap.of("dim1",
dim1Val, "dim2", dim2Val));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 2d999110288..9a2eeb8c1a6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -85,6 +85,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.handoff.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import
org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
@@ -1508,6 +1509,8 @@ public class IndexTaskTest extends IngestionTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedThrownAwayByReason =
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
@@ -1515,7 +1518,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 4,
RowIngestionMeters.PROCESSED_BYTES, 657,
RowIngestionMeters.UNPARSEABLE, 4,
- RowIngestionMeters.THROWN_AWAY, 1
+ RowIngestionMeters.THROWN_AWAY, 1,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1523,7 +1527,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, 657,
RowIngestionMeters.UNPARSEABLE, 4,
- RowIngestionMeters.THROWN_AWAY, 1
+ RowIngestionMeters.THROWN_AWAY, 1,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedThrownAwayByReason
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1680,6 +1685,9 @@ public class IndexTaskTest extends IngestionTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+ Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason =
Map.of();
+ Map<String, Integer> expectedBuildSegmentsThrownAwayByReason =
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
@@ -1687,7 +1695,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.UNPARSEABLE, 0,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedDeterminePartitionsThrownAwayByReason
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1695,7 +1704,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, 182,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 1
+ RowIngestionMeters.THROWN_AWAY, 1,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedBuildSegmentsThrownAwayByReason
)
);
@@ -1790,6 +1800,8 @@ public class IndexTaskTest extends IngestionTestBase
IngestionStatsAndErrors reportData = getTaskReportData();
+ Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason =
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
+ Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = Map.of();
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
@@ -1797,7 +1809,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, 182,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 1
+ RowIngestionMeters.THROWN_AWAY, 1,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedDeterminePartitionsThrownAwayByReason
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
@@ -1805,7 +1818,8 @@ public class IndexTaskTest extends IngestionTestBase
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.UNPARSEABLE, 0,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.THROWN_AWAY_BY_REASON,
expectedBuildSegmentsThrownAwayByReason
)
);
@@ -2738,7 +2752,10 @@ public class IndexTaskTest extends IngestionTestBase
Map<String, AggregatorFactory> aggregatorFactoryMap
)
{
- Assert.assertEquals(segmentWithSchemas.getSegments().size(),
segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size());
+ Assert.assertEquals(
+ segmentWithSchemas.getSegments().size(),
+
segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()
+ );
Assert.assertEquals(1,
segmentWithSchemas.getSegmentSchemaMapping().getSchemaFingerprintToPayloadMap().size());
Assert.assertEquals(
actualRowSignature,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
new file mode 100644
index 00000000000..53616c974e7
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class InputRowFilterTest
+{
+ private static final List<String> DIMENSIONS = ImmutableList.of("dim1");
+
+ @Test
+ public void test_fromPredicate_whichAllowsAll()
+ {
+ InputRowFilter filter = InputRowFilter.fromPredicate(row -> true);
+ InputRow row = newRow(100);
+
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED, filter.test(row));
+ }
+
+ @Test
+ public void testFromPredicateReject()
+ {
+ InputRowFilter filter = InputRowFilter.fromPredicate(row -> false);
+ InputRow row = newRow(100);
+
+ Assert.assertEquals(InputRowFilterResult.CUSTOM_FILTER, filter.test(row));
+ }
+
+ @Test
+ public void testAndBothAccept()
+ {
+ InputRowFilter filter1 = InputRowFilter.allowAll();
+ InputRowFilter filter2 = InputRowFilter.allowAll();
+ InputRowFilter combined = filter1.and(filter2);
+
+ InputRow row = newRow(100);
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED, combined.test(row));
+ }
+
+ @Test
+ public void testAndFirstRejects()
+ {
+ InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+ InputRowFilter filter2 = InputRowFilter.allowAll();
+ InputRowFilter combined = filter1.and(filter2);
+
+ InputRow row = newRow(100);
+ Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
combined.test(row));
+ }
+
+ @Test
+ public void testAndSecondRejects()
+ {
+ InputRowFilter filter1 = InputRowFilter.allowAll();
+ InputRowFilter filter2 = row ->
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+ InputRowFilter combined = filter1.and(filter2);
+
+ InputRow row = newRow(100);
+ Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME,
combined.test(row));
+ }
+
+ @Test
+ public void testAndBothRejectReturnsFirst()
+ {
+ InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+ InputRowFilter filter2 = row -> InputRowFilterResult.CUSTOM_FILTER;
+ InputRowFilter combined = filter1.and(filter2);
+
+ InputRow row = newRow(100);
+ // Should return reason from first filter
+ Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
combined.test(row));
+ }
+
+ @Test
+ public void testChainedAnd()
+ {
+ InputRowFilter filter1 = InputRowFilter.allowAll();
+ InputRowFilter filter2 = InputRowFilter.allowAll();
+ InputRowFilter filter3 = row ->
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME;
+
+ InputRowFilter combined = filter1.and(filter2).and(filter3);
+
+ InputRow row = newRow(100);
+ Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME,
combined.test(row));
+ }
+
+ private static InputRow newRow(Object dim1Val)
+ {
+ return new MapBasedInputRow(
+ DateTimes.of("2020-01-01"),
+ DIMENSIONS,
+ ImmutableMap.of("dim1", dim1Val)
+ );
+ }
+}
+
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index cc3cdffa747..a0e3f3245f2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -48,6 +48,7 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
@@ -492,6 +493,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
Collections.emptyList()
);
TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
+ Map<String, Long> expectedThrownAwayByReason =
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L);
TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(
@@ -508,7 +510,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
1L
)
),
- new RowIngestionMetersTotals(10, 335, 1, 1, 1)
+ new RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1)
);
compareTaskReports(expectedReports, actualReports);
}
@@ -543,7 +545,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
final ParallelIndexSupervisorTask executedTask =
(ParallelIndexSupervisorTask) taskContainer.getTask();
TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
- final RowIngestionMetersTotals expectedTotals = new
RowIngestionMetersTotals(10, 335, 1, 1, 1);
+ Map<String, Long> expectedThrownAwayByReason =
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L);
+ final RowIngestionMetersTotals expectedTotals = new
RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1);
List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
new ParseExceptionReport(
"{ts=2017unparseable}",
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 7518bb7f172..eb08944d46b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -36,6 +36,7 @@ import
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
import org.junit.Assert;
@@ -105,13 +106,13 @@ public class SeekableStreamIndexTaskRunnerTest
LockGranularity.TIME_CHUNK);
Mockito.when(row.getTimestamp()).thenReturn(now);
- Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
- Assert.assertFalse(runner.withinMinMaxRecordTime(row));
+ Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
- Assert.assertFalse(runner.withinMinMaxRecordTime(row));
+ Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
}
@Test
@@ -157,13 +158,58 @@ public class SeekableStreamIndexTaskRunnerTest
TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null,
LockGranularity.TIME_CHUNK);
- Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+ Mockito.when(row.getTimestamp()).thenReturn(now);
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
- Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
- Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+ Assert.assertEquals(InputRowFilterResult.ACCEPTED,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
+ }
+
+ @Test
+ public void testEnsureRowRejectionReasonForNullRow()
+ {
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2")
+ )
+ );
+ DataSchema schema =
+ DataSchema.builder()
+ .withDataSource("datasource")
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withDimensions(dimensionsSpec)
+ .withGranularity(
+ new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
+ )
+ .build();
+
+ SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
+ SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
+ SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers =
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
+
+
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
+ Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
+ Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
+ Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
+
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
+
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
+
+
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
+ Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
+
+ Mockito.when(task.getDataSchema()).thenReturn(schema);
+ Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
+ Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+
+ TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(task, null,
+
LockGranularity.TIME_CHUNK);
+
+ Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(null));
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 649c0d57e46..92ed5785abb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.InputRowFilter;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
@@ -104,7 +105,7 @@ public class StreamChunkParserTest
null,
null,
null,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -121,7 +122,7 @@ public class StreamChunkParserTest
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY,
ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -139,7 +140,7 @@ public class StreamChunkParserTest
null,
null,
null,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -169,7 +170,7 @@ public class StreamChunkParserTest
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY,
ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -190,7 +191,7 @@ public class StreamChunkParserTest
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY,
ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -213,7 +214,7 @@ public class StreamChunkParserTest
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY,
ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -241,7 +242,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
new ParseExceptionHandler(
rowIngestionMeters,
@@ -279,7 +280,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
);
@@ -318,7 +319,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
new ParseExceptionHandler(
rowIngestionMeters,
@@ -354,7 +355,7 @@ public class StreamChunkParserTest
() -> new StreamChunkParser<>(
null,
null,
- row -> true,
+ InputRowFilter.allowAll(),
rowIngestionMeters,
parseExceptionHandler
)
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index 578d792df49..532553aef6e 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -57,6 +57,7 @@ public class DruidMetrics
public static final String STREAM = "stream";
public static final String PARTITION = "partition";
public static final String SUPERVISOR_ID = "supervisorId";
+ public static final String REASON = "reason";
public static final String TAGS = "tags";
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
new file mode 100644
index 00000000000..fcb4d8aec11
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Arrays;
+
+/**
+ * Result of filtering an input row during ingestion.
+ */
+public enum InputRowFilterResult
+{
+ /**
+ * The row passed the filter and should be processed.
+ */
+ ACCEPTED("accepted"),
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL_OR_EMPTY_RECORD("null"),
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME("beforeMinimumMessageTime"),
+
+ /**
+ * The row's timestamp is after the maximum message time (early message
rejection).
+ */
+ AFTER_MAX_MESSAGE_TIME("afterMaximumMessageTime"),
+
+ /**
+ * The row was filtered out by a transformSpec filter or other row filter.
+ */
+ CUSTOM_FILTER("filtered"),
+
+ /**
+ * A backwards-compatible value for tracking filter reasons for ingestion
tasks using older Druid versions without filter reason tracking.
+ */
+ UNKNOWN("unknown");
+
+ private static final InputRowFilterResult[] REJECTED_VALUES =
Arrays.stream(InputRowFilterResult.values())
+
.filter(InputRowFilterResult::isRejected)
+
.toArray(InputRowFilterResult[]::new);
+
+ private final String reason;
+
+ InputRowFilterResult(String reason)
+ {
+ this.reason = reason;
+ }
+
+ /**
+ * Returns string value representation of this {@link InputRowFilterResult}
for metric emission.
+ */
+ public String getReason()
+ {
+ return reason;
+ }
+
+ /**
+ * Returns true if this result indicates the row was rejected (thrown away).
+ * Returns false for {@link #ACCEPTED}.
+ */
+ public boolean isRejected()
+ {
+ return this != ACCEPTED;
+ }
+
+ /**
+ * Returns {@link InputRowFilterResult} that are rejection states.
+ */
+ public static InputRowFilterResult[] rejectedValues()
+ {
+ return REJECTED_VALUES;
+ }
+
+ /**
+ * Returns total number of {@link InputRowFilterResult} values.
+ */
+ public static int numValues()
+ {
+ return values().length;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
index bff4f2e6de3..67eb80a2dc8 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
@@ -23,13 +23,13 @@ import java.util.Collections;
import java.util.Map;
/**
- * This class is used only in {@code RealtimeIndexTask} which is deprecated
now.
+ * This class is used only in {@code DartFrameContext}.
*
* Consider using {@link RowIngestionMetersFactory} instead.
*/
public class NoopRowIngestionMeters implements RowIngestionMeters
{
- private static final RowIngestionMetersTotals EMPTY_TOTALS = new
RowIngestionMetersTotals(0, 0, 0, 0, 0);
+ private static final RowIngestionMetersTotals EMPTY_TOTALS = new
RowIngestionMetersTotals(0, 0, 0, Map.of(), 0);
@Override
public long getProcessed()
@@ -74,11 +74,17 @@ public class NoopRowIngestionMeters implements
RowIngestionMeters
}
@Override
- public void incrementThrownAway()
+ public void incrementThrownAway(InputRowFilterResult reason)
{
}
+ @Override
+ public Map<String, Long> getThrownAwayByReason()
+ {
+ return Map.of();
+ }
+
@Override
public RowIngestionMetersTotals getTotals()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 25fc3bae481..502c2057ebc 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -40,6 +40,7 @@ public interface RowIngestionMeters extends InputStats
String PROCESSED_WITH_ERROR = "processedWithError";
String UNPARSEABLE = "unparseable";
String THROWN_AWAY = "thrownAway";
+ String THROWN_AWAY_BY_REASON = "thrownAwayByReason";
/**
* Number of bytes read by an ingestion task.
@@ -73,7 +74,17 @@ public interface RowIngestionMeters extends InputStats
void incrementUnparseable();
long getThrownAway();
- void incrementThrownAway();
+
+ /**
+ * Increments the thrown away counter for the specified {@link
InputRowFilterResult} reason.
+ */
+ void incrementThrownAway(InputRowFilterResult reason);
+
+ /**
+ * Returns the count of thrown away events for each reason.
+ * Keyed by {@link InputRowFilterResult#getReason()}.
+ */
+ Map<String, Long> getThrownAwayByReason();
RowIngestionMetersTotals getTotals();
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index 2002bb24ac0..43e1b985705 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -21,7 +21,11 @@ package org.apache.druid.segment.incremental;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
public class RowIngestionMetersTotals
@@ -30,6 +34,7 @@ public class RowIngestionMetersTotals
private final long processedBytes;
private final long processedWithError;
private final long thrownAway;
+ private final Map<String, Long> thrownAwayByReason;
private final long unparseable;
@JsonCreator
@@ -38,13 +43,49 @@ public class RowIngestionMetersTotals
@JsonProperty("processedBytes") long processedBytes,
@JsonProperty("processedWithError") long processedWithError,
@JsonProperty("thrownAway") long thrownAway,
+ @JsonProperty("thrownAwayByReason") @Nullable Map<String, Long>
thrownAwayByReason,
@JsonProperty("unparseable") long unparseable
)
+ {
+ this(
+ processed,
+ processedBytes,
+ processedWithError,
+ Configs.valueOrDefault(thrownAwayByReason,
getBackwardsCompatibleThrownAwayByReason(thrownAway)),
+ unparseable
+ );
+ }
+
+ public RowIngestionMetersTotals(
+ long processed,
+ long processedBytes,
+ long processedWithError,
+ long thrownAway,
+ long unparseable
+ )
+ {
+ this(
+ processed,
+ processedBytes,
+ processedWithError,
+ getBackwardsCompatibleThrownAwayByReason(thrownAway),
+ unparseable
+ );
+ }
+
+ public RowIngestionMetersTotals(
+ long processed,
+ long processedBytes,
+ long processedWithError,
+ Map<String, Long> thrownAwayByReason,
+ long unparseable
+ )
{
this.processed = processed;
this.processedBytes = processedBytes;
this.processedWithError = processedWithError;
- this.thrownAway = thrownAway;
+ this.thrownAway = thrownAwayByReason.values().stream().reduce(0L,
Long::sum);
+ this.thrownAwayByReason = thrownAwayByReason;
this.unparseable = unparseable;
}
@@ -72,6 +113,12 @@ public class RowIngestionMetersTotals
return thrownAway;
}
+ @JsonProperty
+ public Map<String, Long> getThrownAwayByReason()
+ {
+ return thrownAwayByReason;
+ }
+
@JsonProperty
public long getUnparseable()
{
@@ -92,13 +139,14 @@ public class RowIngestionMetersTotals
&& processedBytes == that.processedBytes
&& processedWithError == that.processedWithError
&& thrownAway == that.thrownAway
+ && thrownAwayByReason.equals(that.thrownAwayByReason)
&& unparseable == that.unparseable;
}
@Override
public int hashCode()
{
- return Objects.hash(processed, processedBytes, processedWithError,
thrownAway, unparseable);
+ return Objects.hash(processed, processedBytes, processedWithError,
thrownAway, thrownAwayByReason, unparseable);
}
@Override
@@ -109,7 +157,21 @@ public class RowIngestionMetersTotals
", processedBytes=" + processedBytes +
", processedWithError=" + processedWithError +
", thrownAway=" + thrownAway +
+ ", thrownAwayByReason=" + thrownAwayByReason +
", unparseable=" + unparseable +
'}';
}
+
+ /**
+ * For backwards compatibility, key by {@link InputRowFilterResult} in case
of lack of thrownAwayByReason input during rolling Druid upgrades.
+ * This can occur when tasks running on older Druid versions return ingest
statistic payloads to an overlord running on a newer Druid version.
+ */
+ private static Map<String, Long>
getBackwardsCompatibleThrownAwayByReason(long thrownAway)
+ {
+ Map<String, Long> results = new HashMap<>();
+ if (thrownAway > 0) {
+ results.put(InputRowFilterResult.UNKNOWN.getReason(), thrownAway);
+ }
+ return results;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
index 10293e4e24a..2140b781241 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
+import java.util.HashMap;
import java.util.Map;
public class SimpleRowIngestionMeters implements RowIngestionMeters
@@ -26,8 +27,8 @@ public class SimpleRowIngestionMeters implements
RowIngestionMeters
private long processed;
private long processedWithError;
private long unparseable;
- private long thrownAway;
private long processedBytes;
+ private final long[] thrownAwayByReason = new
long[InputRowFilterResult.numValues()];
@Override
public long getProcessed()
@@ -80,13 +81,30 @@ public class SimpleRowIngestionMeters implements
RowIngestionMeters
@Override
public long getThrownAway()
{
- return thrownAway;
+ long total = 0;
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ total += thrownAwayByReason[reason.ordinal()];
+ }
+ return total;
}
@Override
- public void incrementThrownAway()
+ public void incrementThrownAway(InputRowFilterResult reason)
{
- thrownAway++;
+ ++thrownAwayByReason[reason.ordinal()];
+ }
+
+ @Override
+ public Map<String, Long> getThrownAwayByReason()
+ {
+ final Map<String, Long> result = new HashMap<>();
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ long count = thrownAwayByReason[reason.ordinal()];
+ if (count > 0) {
+ result.put(reason.getReason(), count);
+ }
+ }
+ return result;
}
@Override
@@ -96,7 +114,7 @@ public class SimpleRowIngestionMeters implements
RowIngestionMeters
processed,
processedBytes,
processedWithError,
- thrownAway,
+ getThrownAwayByReason(),
unparseable
);
}
@@ -112,7 +130,11 @@ public class SimpleRowIngestionMeters implements
RowIngestionMeters
this.processed += rowIngestionMetersTotals.getProcessed();
this.processedWithError +=
rowIngestionMetersTotals.getProcessedWithError();
this.unparseable += rowIngestionMetersTotals.getUnparseable();
- this.thrownAway += rowIngestionMetersTotals.getThrownAway();
this.processedBytes += rowIngestionMetersTotals.getProcessedBytes();
+
+ final Map<String, Long> thrownAwayByReason =
rowIngestionMetersTotals.getThrownAwayByReason();
+ for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+ this.thrownAwayByReason[reason.ordinal()] +=
thrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
index 4ff36d731df..b137ffe1f35 100644
---
a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -353,6 +354,7 @@ public class TaskReportSerdeTest
"processedBytes", (int)
determinePartitionTotalStats.getProcessedBytes(),
"processedWithError", (int)
determinePartitionTotalStats.getProcessedWithError(),
"thrownAway", (int) determinePartitionTotalStats.getThrownAway(),
+ "thrownAwayByReason",
CollectionUtils.mapValues(determinePartitionTotalStats.getThrownAwayByReason(),
Long::intValue),
"unparseable", (int) determinePartitionTotalStats.getUnparseable()
),
observedTotals.get("determinePartitions")
@@ -363,6 +365,7 @@ public class TaskReportSerdeTest
"processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(),
"processedWithError", (int)
buildSegmentTotalStats.getProcessedWithError(),
"thrownAway", (int) buildSegmentTotalStats.getThrownAway(),
+ "thrownAwayByReason",
CollectionUtils.mapValues(buildSegmentTotalStats.getThrownAwayByReason(),
Long::intValue),
"unparseable", (int) buildSegmentTotalStats.getUnparseable()
),
observedTotals.get("buildSegments")
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
new file mode 100644
index 00000000000..a8d932e4dcd
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputRowFilterResultTest
+{
+ @Test
+ public void testOrdinalValues()
+ {
+ Assert.assertEquals(0, InputRowFilterResult.ACCEPTED.ordinal());
+ Assert.assertEquals(1,
InputRowFilterResult.NULL_OR_EMPTY_RECORD.ordinal());
+ Assert.assertEquals(2,
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.ordinal());
+ Assert.assertEquals(3,
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.ordinal());
+ Assert.assertEquals(4, InputRowFilterResult.CUSTOM_FILTER.ordinal());
+ }
+
+ @Test
+ public void testMetricValues()
+ {
+ Assert.assertEquals("accepted", InputRowFilterResult.ACCEPTED.getReason());
+ Assert.assertEquals("null",
InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason());
+ Assert.assertEquals("beforeMinimumMessageTime",
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason());
+ Assert.assertEquals("afterMaximumMessageTime",
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason());
+ Assert.assertEquals("filtered",
InputRowFilterResult.CUSTOM_FILTER.getReason());
+ }
+
+ @Test
+ public void testEnumCardinality()
+ {
+ Assert.assertEquals(6, InputRowFilterResult.values().length);
+ }
+
+ @Test
+ public void testIsRejected()
+ {
+ Assert.assertFalse(InputRowFilterResult.ACCEPTED.isRejected());
+ Assert.assertTrue(InputRowFilterResult.NULL_OR_EMPTY_RECORD.isRejected());
+
Assert.assertTrue(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.isRejected());
+
Assert.assertTrue(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.isRejected());
+ Assert.assertTrue(InputRowFilterResult.CUSTOM_FILTER.isRejected());
+ Assert.assertTrue(InputRowFilterResult.UNKNOWN.isRejected());
+ }
+}
+
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
index 02b1d290003..f155e70424d 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
@@ -19,6 +19,9 @@
package org.apache.druid.segment.incremental;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Utility class to build {@link RowIngestionMetersTotals}, used in tests.
*/
@@ -27,7 +30,7 @@ public class RowMeters
private long processedBytes;
private long processedWithError;
private long unparseable;
- private long thrownAway;
+ private final Map<String, Long> thrownAwayByReason = new HashMap<>();
/**
* Creates a new {@link RowMeters}, that can be used to build an instance of
@@ -56,14 +59,20 @@ public class RowMeters
return this;
}
+ public RowMeters thrownAwayByReason(InputRowFilterResult thrownAwayByReason,
long thrownAway)
+ {
+ this.thrownAwayByReason.put(thrownAwayByReason.getReason(), thrownAway);
+ return this;
+ }
+
public RowMeters thrownAway(long thrownAway)
{
- this.thrownAway = thrownAway;
+ this.thrownAwayByReason.put(InputRowFilterResult.UNKNOWN.getReason(),
thrownAway);
return this;
}
public RowIngestionMetersTotals totalProcessed(long processed)
{
- return new RowIngestionMetersTotals(processed, processedBytes,
processedWithError, thrownAway, unparseable);
+ return new RowIngestionMetersTotals(processed, processedBytes,
processedWithError, thrownAwayByReason, unparseable);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
index 5f46129c170..9cccff0c880 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Map;
+
public class SimpleRowIngestionMetersTest
{
@Test
@@ -32,16 +34,49 @@ public class SimpleRowIngestionMetersTest
rowIngestionMeters.incrementProcessedBytes(5);
rowIngestionMeters.incrementProcessedWithError();
rowIngestionMeters.incrementUnparseable();
- rowIngestionMeters.incrementThrownAway();
- Assert.assertEquals(rowIngestionMeters.getTotals(), new
RowIngestionMetersTotals(1, 5, 1, 1, 1));
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+ final Map<String, Long> expected =
Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1L);
+ Assert.assertEquals(new RowIngestionMetersTotals(1, 5, 1, expected, 1),
rowIngestionMeters.getTotals());
}
@Test
public void testAddRowIngestionMetersTotals()
{
SimpleRowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
- RowIngestionMetersTotals rowIngestionMetersTotals = new
RowIngestionMetersTotals(10, 0, 1, 0, 1);
+ RowIngestionMetersTotals rowIngestionMetersTotals = new
RowIngestionMetersTotals(10, 0, 1, 1, 1);
rowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals);
- Assert.assertEquals(rowIngestionMeters.getTotals(),
rowIngestionMetersTotals);
+ Assert.assertEquals(rowIngestionMetersTotals,
rowIngestionMeters.getTotals());
+ }
+
+ @Test
+ public void testIncrementThrownAwayWithReason()
+ {
+ SimpleRowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
+
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+ rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+ rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+ Assert.assertEquals(7, rowIngestionMeters.getThrownAway());
+
+ Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+ Assert.assertEquals(Long.valueOf(2),
byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
+ Assert.assertEquals(Long.valueOf(1),
byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
+ Assert.assertEquals(Long.valueOf(3),
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+ }
+
+ @Test
+ public void testGetThrownAwayByReasonReturnsNoRejectedReasons()
+ {
+ SimpleRowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
+
+ // With no increments, no rejected reasons should be present
+ Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+ Assert.assertTrue(byReason.isEmpty());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]