This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b5ea6aed6 Add drop reason dimension to ingest/events/thrownAway 
metric (#18855)
c4b5ea6aed6 is described below

commit c4b5ea6aed6e03a2ab953e04ea56483df53637a2
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Dec 23 12:01:55 2025 -0500

    Add drop reason dimension to ingest/events/thrownAway metric (#18855)
    
    This adds:
    - Better logging in task logs indicating the breakdown of thrown away 
events by reason.
    - A `reason` dimension to the `ingest/events/thrownAway` metric for 
aggregating on thrown away reason.
    - A `thrownAwayByReason` map to row statistics task API response payload, 
so future consumers can make use of it should they need to.
    - Better interface for row filters, making it easy to adding more filtering 
reasons in the future.
---
 docs/operations/metrics.md                         |   2 +-
 .../main/resources/defaultMetricDimensions.json    |   3 +-
 .../src/main/resources/defaultMetrics.json         |   3 +-
 .../src/main/resources/defaultMetrics.json         |   2 +-
 .../main/resources/defaultMetricDimensions.json    |   2 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  31 ++--
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  17 +-
 .../common/stats/DropwizardRowIngestionMeters.java |  39 +++--
 .../common/stats/TaskRealtimeMetricsMonitor.java   |  33 +++-
 .../common/task/AbstractBatchIndexTask.java        |   2 +-
 .../task/FilteringCloseableInputRowIterator.java   |  17 +-
 .../druid/indexing/common/task/InputRowFilter.java |  73 +++++++++
 .../parallel/ParallelIndexSupervisorTask.java      |   3 +
 .../SeekableStreamIndexTaskRunner.java             |  30 ++--
 .../indexing/seekablestream/StreamChunkParser.java |  11 +-
 .../common/TaskRealtimeMetricsMonitorTest.java     | 171 ++++++++++++++++-----
 .../stats/DropwizardRowIngestionMetersTest.java    | 104 +++++++++++++
 .../FilteringCloseableInputRowIteratorTest.java    | 113 +++++++++++++-
 .../druid/indexing/common/task/IndexTaskTest.java  |  31 +++-
 .../indexing/common/task/InputRowFilterTest.java   | 122 +++++++++++++++
 .../parallel/SinglePhaseParallelIndexingTest.java  |   7 +-
 .../SeekableStreamIndexTaskRunnerTest.java         |  58 ++++++-
 .../seekablestream/StreamChunkParserTest.java      |  21 +--
 .../java/org/apache/druid/query/DruidMetrics.java  |   1 +
 .../segment/incremental/InputRowFilterResult.java  | 101 ++++++++++++
 .../incremental/NoopRowIngestionMeters.java        |  12 +-
 .../segment/incremental/RowIngestionMeters.java    |  13 +-
 .../incremental/RowIngestionMetersTotals.java      |  66 +++++++-
 .../incremental/SimpleRowIngestionMeters.java      |  34 +++-
 .../druid/indexer/report/TaskReportSerdeTest.java  |   3 +
 .../incremental/InputRowFilterResultTest.java      |  64 ++++++++
 .../druid/segment/incremental/RowMeters.java       |  15 +-
 .../incremental/SimpleRowIngestionMetersTest.java  |  43 +++++-
 33 files changed, 1095 insertions(+), 152 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index aa1c25a693a..e2c957b9cee 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/events/processed`|Number of events processed per emission 
period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the 
number of events per emission period.|
 |`ingest/events/processedWithError`|Number of events processed with some 
partial errors per emission period. Events processed with partial errors are 
counted towards both this metric and `ingest/events/processed`.|`dataSource`, 
`taskId`, `taskType`, `groupId`, `tags`|0|
 |`ingest/events/unparseable`|Number of events rejected because the events are 
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
-|`ingest/events/thrownAway`|Number of events rejected because they are null, 
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, 
`earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, 
`tags`|0|
+|`ingest/events/thrownAway`|Number of events rejected because they are null, 
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, 
`earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event 
was thrown away.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, 
`reason`|0|
 |`ingest/events/duplicate`|Number of events rejected because the events are 
duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
 |`ingest/input/bytes`|Number of bytes read from input sources, after 
decompression but prior to parsing. This covers all data read, including data 
that does not end up being fully processed and ingested. For example, this 
includes data that ends up being rejected for being unparseable or filtered 
out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
amount of data read.|
 |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, 
`taskType`, `groupId`|Your number of events with rollup.|
diff --git 
a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
index 6b8d9a9887d..bc0c451870f 100644
--- 
a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json
@@ -126,7 +126,8 @@
   },
   "ingest/events/thrownAway": {
     "dimensions": [
-      "dataSource"
+      "dataSource",
+      "reason"
     ],
     "type": "counter"
   },
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index 311a06e6e4c..69100697a62 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -54,7 +54,8 @@
   "query/cache/total/timeouts": [],
   "query/cache/total/errors": [],
   "ingest/events/thrownAway": [
-    "dataSource"
+    "dataSource",
+    "reason"
   ],
   "ingest/events/unparseable": [
     "dataSource"
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 82ced1e9abb..3702039c283 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++ 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -104,7 +104,7 @@
   "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : 
"count", "help": "Number of events successfully processed per emission period." 
},
   "ingest/events/processedWithError" : { "dimensions" : ["dataSource"], "type" 
: "count", "help": "Number of events processed with some partial errors per 
emission period." },
   "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : 
"count", "help": "Number of events rejected because the events are 
unparseable." },
-  "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : 
"count", "help": "Number of events rejected because they are outside the 
windowPeriod."},
+  "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], 
"type" : "count", "help": "Number of events rejected because they are null, 
filtered by transformSpec, or outside the message rejection periods. The 
`reason` dimension indicates why the event was thrown away."},
   "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : 
"count", "help": "Number of events rejected because the events are 
duplicated."},
   "ingest/input/bytes" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of bytes read from input sources, after decompression but prior 
to parsing." },
   "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of Druid rows persisted."},
diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 0e3862f3da5..e6b7f9f594a 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -38,7 +38,7 @@
   "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
   "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },
 
-  "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : 
"count" },
+  "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], 
"type" : "count" },
   "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : 
"count" },
   "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : 
"count" },
   "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : 
"count" },
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 96dd04d42cf..cb359896ee2 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -113,6 +113,7 @@ import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.incremental.RowMeters;
@@ -717,7 +718,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     );
 
     long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 
15);
-    verifyTaskMetrics(task, 
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
+    verifyTaskMetrics(task, 
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
 1).totalProcessed(8));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -851,7 +852,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     );
 
     long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 
15);
-    verifyTaskMetrics(task, 
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
+    verifyTaskMetrics(task, 
RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
 1).totalProcessed(8));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -1165,7 +1166,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAway(2).totalProcessed(3));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 
2).totalProcessed(3));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -1214,7 +1215,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAway(2).totalProcessed(3));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 
2).totalProcessed(3));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -1272,7 +1273,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAway(4).totalProcessed(1));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
 
     // Check published metadata
     final List<SegmentDescriptor> publishedDescriptors = 
publishedDescriptors();
@@ -1642,7 +1643,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     verifyTaskMetrics(task, RowMeters.with()
                                      .bytes(totalRecordBytes)
                                      .unparseable(3).errors(3)
-                                     .thrownAway(1).totalProcessed(4));
+                                     
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
1).totalProcessed(4));
 
     // Check published metadata
     assertEqualsExceptVersion(
@@ -1660,6 +1661,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
     Assert.assertNull(reportData.getErrorMsg());
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedThrownAwayByReason = 
Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1);
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1667,7 +1670,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             RowIngestionMeters.PROCESSED_BYTES, (int) totalRecordBytes,
             RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
             RowIngestionMeters.UNPARSEABLE, 3,
-            RowIngestionMeters.THROWN_AWAY, 1
+            RowIngestionMeters.THROWN_AWAY, 1,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         )
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1745,6 +1749,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(IngestionState.BUILD_SEGMENTS, 
reportData.getIngestionState());
     Assert.assertNotNull(reportData.getErrorMsg());
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedThrownAwayByReason = Map.of();
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1752,7 +1758,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
             RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
             RowIngestionMeters.UNPARSEABLE, 3,
-            RowIngestionMeters.THROWN_AWAY, 0
+            RowIngestionMeters.THROWN_AWAY, 0,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         )
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1887,7 +1894,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 
5)).totalProcessed(3));
     verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 
10))
-                                      
.unparseable(3).thrownAway(1).totalProcessed(3));
+                                      
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
1).totalProcessed(3));
 
     // Check published segments & metadata, should all be from the first task
     final List<SegmentDescriptor> publishedDescriptors = 
publishedDescriptors();
@@ -1961,7 +1968,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 
5)).totalProcessed(3));
     verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 
10))
-                                      
.unparseable(3).thrownAway(1).totalProcessed(3));
+                                      
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
1).totalProcessed(3));
 
     // Check published segments & metadata
     SegmentDescriptorAndExpectedDim1Values desc3 = sdd("2011/P1D", 1, 
ImmutableList.of("d", "e"));
@@ -2576,7 +2583,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     long totalBytes = getTotalSizeOfRecords(0, 2) + getTotalSizeOfRecords(5, 
11);
     verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes)
-                                     
.unparseable(3).errors(1).thrownAway(1).totalProcessed(3));
+                                     
.unparseable(3).errors(1).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
 1).totalProcessed(3));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -3437,7 +3444,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAway(4).totalProcessed(1));
+    verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 
5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
 
     // Check published metadata
     final List<SegmentDescriptor> publishedDescriptors = 
publishedDescriptors();
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 98dfea4333c..bd2a8aa2712 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -82,6 +82,7 @@ import 
org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowMeters;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -801,7 +802,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     verifyAll();
 
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
-                                     .thrownAway(2).totalProcessed(3));
+                                     
.thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 
2).totalProcessed(3));
 
     // Check published metadata
     assertEqualsExceptVersion(
@@ -864,7 +865,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     verifyAll();
 
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
-                                     .thrownAway(2).totalProcessed(3));
+                                     
.thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 
2).totalProcessed(3));
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -923,7 +924,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     verifyAll();
 
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
-                                     .thrownAway(4).totalProcessed(1));
+                                     
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
4).totalProcessed(1));
 
     // Check published metadata
     assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)), 
publishedDescriptors());
@@ -1194,6 +1195,8 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(IngestionState.COMPLETED, 
reportData.getIngestionState());
     Assert.assertNull(reportData.getErrorMsg());
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedThrownAwayByReason = Map.of();
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1201,7 +1204,8 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             RowIngestionMeters.PROCESSED_BYTES, 763,
             RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
             RowIngestionMeters.UNPARSEABLE, 4,
-            RowIngestionMeters.THROWN_AWAY, 0
+            RowIngestionMeters.THROWN_AWAY, 0,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         )
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1284,6 +1288,8 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(IngestionState.BUILD_SEGMENTS, 
reportData.getIngestionState());
     Assert.assertNotNull(reportData.getErrorMsg());
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedThrownAwayByReason = Map.of();
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1291,7 +1297,8 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
             RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
             RowIngestionMeters.UNPARSEABLE, 3,
-            RowIngestionMeters.THROWN_AWAY, 0
+            RowIngestionMeters.THROWN_AWAY, 0,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         )
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
index e9fe0683f22..49a19b23799 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
@@ -21,9 +21,11 @@ package org.apache.druid.indexing.common.stats;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -37,7 +39,7 @@ public class DropwizardRowIngestionMeters implements 
RowIngestionMeters
   private final Meter processedBytes;
   private final Meter processedWithError;
   private final Meter unparseable;
-  private final Meter thrownAway;
+  private final Meter[] thrownAwayByReason = new 
Meter[InputRowFilterResult.numValues()];
 
   public DropwizardRowIngestionMeters()
   {
@@ -46,7 +48,9 @@ public class DropwizardRowIngestionMeters implements 
RowIngestionMeters
     this.processedBytes = metricRegistry.meter(PROCESSED_BYTES);
     this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
     this.unparseable = metricRegistry.meter(UNPARSEABLE);
-    this.thrownAway = metricRegistry.meter(THROWN_AWAY);
+    for (InputRowFilterResult reason : InputRowFilterResult.values()) {
+      this.thrownAwayByReason[reason.ordinal()] = 
metricRegistry.meter(THROWN_AWAY + "_" + reason.name());
+    }
   }
 
   @Override
@@ -100,13 +104,30 @@ public class DropwizardRowIngestionMeters implements 
RowIngestionMeters
   @Override
   public long getThrownAway()
   {
-    return thrownAway.getCount();
+    long totalThrownAway = 0;
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      totalThrownAway += thrownAwayByReason[reason.ordinal()].getCount();
+    }
+    return totalThrownAway;
   }
 
   @Override
-  public void incrementThrownAway()
+  public void incrementThrownAway(InputRowFilterResult reason)
   {
-    thrownAway.mark();
+    thrownAwayByReason[reason.ordinal()].mark();
+  }
+
+  @Override
+  public Map<String, Long> getThrownAwayByReason()
+  {
+    Map<String, Long> result = new HashMap<>();
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      long count = thrownAwayByReason[reason.ordinal()].getCount();
+      if (count > 0) {
+        result.put(reason.getReason(), count);
+      }
+    }
+    return result;
   }
 
   @Override
@@ -116,7 +137,7 @@ public class DropwizardRowIngestionMeters implements 
RowIngestionMeters
         processed.getCount(),
         processedBytes.getCount(),
         processedWithError.getCount(),
-        thrownAway.getCount(),
+        getThrownAwayByReason(),
         unparseable.getCount()
     );
   }
@@ -131,21 +152,21 @@ public class DropwizardRowIngestionMeters implements 
RowIngestionMeters
     oneMinute.put(PROCESSED_BYTES, processedBytes.getOneMinuteRate());
     oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate());
     oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate());
-    oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate());
+    oneMinute.put(THROWN_AWAY, 
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getOneMinuteRate).sum());
 
     Map<String, Object> fiveMinute = new HashMap<>();
     fiveMinute.put(PROCESSED, processed.getFiveMinuteRate());
     fiveMinute.put(PROCESSED_BYTES, processedBytes.getFiveMinuteRate());
     fiveMinute.put(PROCESSED_WITH_ERROR, 
processedWithError.getFiveMinuteRate());
     fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate());
-    fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate());
+    fiveMinute.put(THROWN_AWAY, 
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFiveMinuteRate).sum());
 
     Map<String, Object> fifteenMinute = new HashMap<>();
     fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate());
     fifteenMinute.put(PROCESSED_BYTES, processedBytes.getFifteenMinuteRate());
     fifteenMinute.put(PROCESSED_WITH_ERROR, 
processedWithError.getFifteenMinuteRate());
     fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate());
-    fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate());
+    fifteenMinute.put(THROWN_AWAY, 
Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFifteenMinuteRate).sum());
 
     movingAverages.put(ONE_MINUTE_NAME, oneMinute);
     movingAverages.put(FIVE_MINUTE_NAME, fiveMinute);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 4895776a796..462fc234812 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -23,10 +23,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Emits metrics from {@link SegmentGenerationMetrics} and {@link 
RowIngestionMeters}.
  */
@@ -51,7 +56,7 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
     this.rowIngestionMeters = rowIngestionMeters;
     this.builder = metricEventBuilder;
     previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
-    previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 
0, 0);
+    previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 
Map.of(), 0);
   }
 
   @Override
@@ -60,14 +65,30 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
     SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot();
     RowIngestionMetersTotals rowIngestionMetersTotals = 
rowIngestionMeters.getTotals();
 
-    final long thrownAway = rowIngestionMetersTotals.getThrownAway() - 
previousRowIngestionMetersTotals.getThrownAway();
-    if (thrownAway > 0) {
+    // Emit per-reason metrics with the reason dimension
+    final Map<String, Long> currentThrownAwayByReason = 
rowIngestionMetersTotals.getThrownAwayByReason();
+    final Map<String, Long> previousThrownAwayByReason = 
previousRowIngestionMetersTotals.getThrownAwayByReason();
+    final Map<String, Long> deltaThrownAwayByReason = new HashMap<>();
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      final long currentCount = 
currentThrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+      final long previousCount = 
previousThrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+      final long delta = currentCount - previousCount;
+      if (delta > 0) {
+        deltaThrownAwayByReason.put(reason.getReason(), delta);
+        emitter.emit(
+            builder.setDimension(DruidMetrics.REASON, reason.getReason())
+                   .setMetric("ingest/events/thrownAway", delta)
+        );
+      }
+    }
+    final long totalThrownAway = 
deltaThrownAwayByReason.values().stream().reduce(0L, Long::sum);
+    if (totalThrownAway > 0) {
       log.warn(
-          "[%,d] events thrown away. Possible causes: null events, events 
filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / 
lateMessageRejectionPeriod.",
-          thrownAway
+          "[%,d] events thrown away. Breakdown: [%s]",
+          totalThrownAway,
+          deltaThrownAwayByReason
       );
     }
-    emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway));
 
     final long unparseable = rowIngestionMetersTotals.getUnparseable()
                              - 
previousRowIngestionMetersTotals.getUnparseable();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 97dc70f3357..ccdc99dcd05 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -230,7 +230,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     );
     return new FilteringCloseableInputRowIterator(
         inputSourceReader.read(ingestionMeters),
-        rowFilter,
+        InputRowFilter.fromPredicate(rowFilter),
         ingestionMeters,
         parseExceptionHandler
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
index 30af8febaae..074ae6d9f09 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -22,22 +22,22 @@ package org.apache.druid.indexing.common.task;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
-import java.util.function.Predicate;
 
 /**
  * An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter 
out rows which do not satisfy the given
- * {@link #filter} or throw {@link ParseException} while parsing them. The 
relevant metric should be counted whenever
+ * {@link InputRowFilter} or throw {@link ParseException} while parsing them. 
The relevant metric should be counted whenever
  * it filters out rows based on the filter. ParseException handling is 
delegatged to {@link ParseExceptionHandler}.
  */
 public class FilteringCloseableInputRowIterator implements 
CloseableIterator<InputRow>
 {
   private final CloseableIterator<InputRow> delegate;
-  private final Predicate<InputRow> filter;
+  private final InputRowFilter rowFilter;
   private final RowIngestionMeters rowIngestionMeters;
   private final ParseExceptionHandler parseExceptionHandler;
 
@@ -45,13 +45,13 @@ public class FilteringCloseableInputRowIterator implements 
CloseableIterator<Inp
 
   public FilteringCloseableInputRowIterator(
       CloseableIterator<InputRow> delegate,
-      Predicate<InputRow> filter,
+      InputRowFilter rowFilter,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler
   )
   {
     this.delegate = delegate;
-    this.filter = filter;
+    this.rowFilter = rowFilter;
     this.rowIngestionMeters = rowIngestionMeters;
     this.parseExceptionHandler = parseExceptionHandler;
   }
@@ -66,11 +66,12 @@ public class FilteringCloseableInputRowIterator implements 
CloseableIterator<Inp
         while (next == null && delegate.hasNext()) {
           // delegate.next() can throw ParseException
           final InputRow row = delegate.next();
-          // filter.test() can throw ParseException
-          if (filter.test(row)) {
+          // rowFilter.test() can throw ParseException
+          final InputRowFilterResult filterResult = rowFilter.test(row);
+          if (!filterResult.isRejected()) {
             next = row;
           } else {
-            rowIngestionMeters.incrementThrownAway();
+            rowIngestionMeters.incrementThrownAway(filterResult);
           }
         }
         break;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
new file mode 100644
index 00000000000..8f65b9bcbe9
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+
+import java.util.function.Predicate;
+
+/**
+ * A filter for input rows during ingestion that returns a {@link 
InputRowFilterResult} per row.
+ * This is similar to {@link Predicate} but returns an {@link 
InputRowFilterResult} instead of a boolean.
+ */
+@FunctionalInterface
+public interface InputRowFilter
+{
+  /**
+   * Tests whether the given row should be accepted.
+   *
+   * @return {@link InputRowFilterResult#ACCEPTED} only if the row should be 
accepted, otherwise another {@link InputRowFilterResult} value.
+   */
+  InputRowFilterResult test(InputRow row);
+
+  /**
+   * Creates a {@link InputRowFilter} from a {@link Predicate}.
+   * Callers wishing to return custom rejection reason logic should implement 
their own {@link InputRowFilter} directly.
+   */
+  static InputRowFilter fromPredicate(Predicate<InputRow> predicate)
+  {
+    return row -> predicate.test(row) ? InputRowFilterResult.ACCEPTED : 
InputRowFilterResult.CUSTOM_FILTER;
+  }
+
+  /**
+   * Fully-permissive {@link InputRowFilter} used mainly for tests.
+   */
+  static InputRowFilter allowAll()
+  {
+    return row -> InputRowFilterResult.ACCEPTED;
+  }
+
+  /**
+   * Combines this filter with another filter. A row is rejected if either 
filter rejects it.
+   * The rejection reason from the first rejecting filter (this filter first) 
is returned.
+   */
+  default InputRowFilter and(InputRowFilter other)
+  {
+    return row -> {
+      InputRowFilterResult result = this.test(row);
+      if (result.isRejected()) {
+        return result;
+      }
+      return other.test(row);
+    };
+  }
+}
+
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 9faa4c2013e..74678086d59 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1602,11 +1602,14 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
       return (RowIngestionMetersTotals) buildSegmentsRowStats;
     } else if (buildSegmentsRowStats instanceof Map) {
       Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
+      Map<String, Integer> thrownAwayByReason = (Map) 
buildSegmentsRowStatsMap.get("thrownAwayByReason");
       return new RowIngestionMetersTotals(
           ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
           ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
           ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
           ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          // Jackson will serde numerics ≤ 32bits as Integers, rather than 
Longs
+          thrownAwayByReason != null ? 
CollectionUtils.mapValues(thrownAwayByReason, Integer::longValue) : null,
           ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
       );
     } else {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 16505f761bb..a448e01f918 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -419,7 +420,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         inputRowSchema,
         task.getDataSchema().getTransformSpec(),
         toolbox.getIndexingTmpDir(),
-        row -> row != null && withinMinMaxRecordTime(row),
+        this::ensureRowIsNonNullAndWithinMessageTimeBounds,
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -2144,26 +2145,33 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     );
   }
 
-  public boolean withinMinMaxRecordTime(final InputRow row)
+  /**
+   * Returns {@link InputRowFilterResult#ACCEPTED} if the row should be 
accepted,
+   * or a rejection reason otherwise.
+   */
+  InputRowFilterResult ensureRowIsNonNullAndWithinMessageTimeBounds(@Nullable 
InputRow row)
   {
-    final boolean beforeMinimumMessageTime = 
minMessageTime.isAfter(row.getTimestamp());
-    final boolean afterMaximumMessageTime = 
maxMessageTime.isBefore(row.getTimestamp());
-
-    if (log.isDebugEnabled()) {
-      if (beforeMinimumMessageTime) {
+    if (row == null) {
+      return InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+    } else if (minMessageTime.isAfter(row.getTimestamp())) {
+      if (log.isDebugEnabled()) {
         log.debug(
-            "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
+            "CurrentTimeStamp[%s] is before minimumMessageTime[%s]",
             row.getTimestamp(),
             minMessageTime
         );
-      } else if (afterMaximumMessageTime) {
+      }
+      return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+    } else if (maxMessageTime.isBefore(row.getTimestamp())) {
+      if (log.isDebugEnabled()) {
         log.debug(
-            "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
+            "CurrentTimeStamp[%s] is after maximumMessageTime[%s]",
             row.getTimestamp(),
             maxMessageTime
         );
       }
+      return InputRowFilterResult.AFTER_MAX_MESSAGE_TIME;
     }
-    return !beforeMinimumMessageTime && !afterMaximumMessageTime;
+    return InputRowFilterResult.ACCEPTED;
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 3ac952c16c2..ad9886e424a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -28,9 +28,11 @@ import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.InputRowParser;
 import 
org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
+import org.apache.druid.indexing.common.task.InputRowFilter;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.transform.TransformSpec;
@@ -42,7 +44,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.Predicate;
 
 /**
  * Abstraction for parsing stream data which internally uses {@link 
org.apache.druid.data.input.InputEntityReader}
@@ -54,7 +55,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
   private final InputRowParser<ByteBuffer> parser;
   @Nullable
   private final SettableByteEntityReader<RecordType> byteEntityReader;
-  private final Predicate<InputRow> rowFilter;
+  private final InputRowFilter rowFilter;
   private final RowIngestionMeters rowIngestionMeters;
   private final ParseExceptionHandler parseExceptionHandler;
 
@@ -67,7 +68,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
       InputRowSchema inputRowSchema,
       TransformSpec transformSpec,
       File indexingTmpDir,
-      Predicate<InputRow> rowFilter,
+      InputRowFilter rowFilter,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler
   )
@@ -96,7 +97,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
   StreamChunkParser(
       @Nullable InputRowParser<ByteBuffer> parser,
       @Nullable SettableByteEntityReader<RecordType> byteEntityReader,
-      Predicate<InputRow> rowFilter,
+      InputRowFilter rowFilter,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler
   )
@@ -117,7 +118,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
       if (!isEndOfShard) {
         // We do not count end of shard record as thrown away event since this 
is a record created by Druid
         // Note that this only applies to Kinesis
-        rowIngestionMeters.incrementThrownAway();
+        
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
       }
       return Collections.emptyList();
     } else {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
index 252d265d67d..bcfebef2a6c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -21,28 +21,22 @@ package org.apache.druid.indexing.common;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.MonitorUtils;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-@RunWith(MockitoJUnitRunner.class)
 public class TaskRealtimeMetricsMonitorTest
 {
   private static final Map<String, String[]> DIMENSIONS = ImmutableMap.of(
@@ -53,28 +47,18 @@ public class TaskRealtimeMetricsMonitorTest
   );
 
   private static final Map<String, Object> TAGS = ImmutableMap.of("author", 
"Author Name", "version", 10);
-  private SegmentGenerationMetrics segmentGenerationMetrics;
 
-  @Mock(answer = Answers.RETURNS_MOCKS)
+  private SegmentGenerationMetrics segmentGenerationMetrics;
   private RowIngestionMeters rowIngestionMeters;
-  @Mock
-  private ServiceEmitter emitter;
-  private Map<String, ServiceMetricEvent> emittedEvents;
+  private StubServiceEmitter emitter;
   private TaskRealtimeMetricsMonitor target;
 
   @Before
   public void setUp()
   {
-    emittedEvents = new HashMap<>();
     segmentGenerationMetrics = new SegmentGenerationMetrics();
-    
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
-    Mockito
-        .doAnswer(invocation -> {
-          ServiceMetricEvent e = invocation.getArgument(0);
-          emittedEvents.put(e.getMetric(), e);
-          return null;
-        })
-        .when(emitter).emit(ArgumentMatchers.any(Event.class));
+    rowIngestionMeters = new SimpleRowIngestionMeters();
+    emitter = new StubServiceEmitter();
     target = new TaskRealtimeMetricsMonitor(
         segmentGenerationMetrics,
         rowIngestionMeters,
@@ -86,7 +70,10 @@ public class TaskRealtimeMetricsMonitorTest
   public void testdoMonitorShouldEmitUserProvidedTags()
   {
     target.doMonitor(emitter);
-    for (ServiceMetricEvent sme : emittedEvents.values()) {
+
+    List<ServiceMetricEvent> events = 
emitter.getMetricEvents("ingest/events/unparseable");
+    Assert.assertFalse(events.isEmpty());
+    for (ServiceMetricEvent sme : events) {
       Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS));
     }
   }
@@ -94,12 +81,19 @@ public class TaskRealtimeMetricsMonitorTest
   @Test
   public void testdoMonitorWithoutTagsShouldNotEmitTags()
   {
+    ServiceMetricEvent.Builder builderWithoutTags = new 
ServiceMetricEvent.Builder();
+    MonitorUtils.addDimensionsToBuilder(builderWithoutTags, DIMENSIONS);
+
     target = new TaskRealtimeMetricsMonitor(
         segmentGenerationMetrics,
         rowIngestionMeters,
-        createMetricEventBuilder()
+        builderWithoutTags
     );
-    for (ServiceMetricEvent sme : emittedEvents.values()) {
+    target.doMonitor(emitter);
+
+    List<ServiceMetricEvent> events = 
emitter.getMetricEvents("ingest/events/unparseable");
+    Assert.assertFalse(events.isEmpty());
+    for (ServiceMetricEvent sme : events) {
       Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
     }
   }
@@ -107,24 +101,123 @@ public class TaskRealtimeMetricsMonitorTest
   @Test
   public void testMessageGapAggStats()
   {
-    target = new TaskRealtimeMetricsMonitor(
+    target.doMonitor(emitter);
+    
Assert.assertTrue(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty());
+    
Assert.assertTrue(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty());
+    
Assert.assertTrue(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty());
+
+    emitter.flush();
+    segmentGenerationMetrics.reportMessageGap(1);
+    target.doMonitor(emitter);
+
+    
Assert.assertFalse(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty());
+    
Assert.assertFalse(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty());
+    
Assert.assertFalse(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty());
+  }
+
+  @Test
+  public void testThrownAwayEmitsReasonDimension()
+  {
+    SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+    
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+    
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+    
realMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+    TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
         segmentGenerationMetrics,
-        rowIngestionMeters,
+        realMeters,
         createMetricEventBuilder()
     );
 
-    target.doMonitor(emitter);
-    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap"));
-    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/maxMessageGap"));
-    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+    monitor.doMonitor(emitter);
 
-    emittedEvents.clear();
-    segmentGenerationMetrics.reportMessageGap(1);
-    target.doMonitor(emitter);
+    Map<String, Long> thrownAwayByReason = new HashMap<>();
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents("ingest/events/thrownAway")) {
+      Object reason = event.getUserDims().get("reason");
+      thrownAwayByReason.put(reason.toString(), event.getValue().longValue());
+    }
+
+    Assert.assertEquals(Long.valueOf(2), thrownAwayByReason.get("null"));
+    Assert.assertEquals(Long.valueOf(3), 
thrownAwayByReason.get("beforeMinimumMessageTime"));
+    Assert.assertEquals(Long.valueOf(1), 
thrownAwayByReason.get("afterMaximumMessageTime"));
+    Assert.assertEquals(Long.valueOf(4), thrownAwayByReason.get("filtered"));
+  }
+
+  @Test
+  public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero()
+  {
+    SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+    TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
+        segmentGenerationMetrics,
+        realMeters,
+        createMetricEventBuilder()
+    );
+
+    monitor.doMonitor(emitter);
+
+    Map<String, Long> thrownAwayByReason = new HashMap<>();
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents("ingest/events/thrownAway")) {
+      Object reason = event.getUserDims().get("reason");
+      thrownAwayByReason.put(reason.toString(), event.getValue().longValue());
+    }
+
+    // Only reasons with non-zero counts should be emitted
+    Assert.assertEquals(2, thrownAwayByReason.size());
+    Assert.assertTrue(thrownAwayByReason.containsKey("null"));
+    Assert.assertTrue(thrownAwayByReason.containsKey("filtered"));
+    
Assert.assertFalse(thrownAwayByReason.containsKey("beforeMinimumMessageTime"));
+    
Assert.assertFalse(thrownAwayByReason.containsKey("afterMaximumMessageTime"));
+  }
+
+  @Test
+  public void testThrownAwayReasonDeltaAcrossMonitorCalls()
+  {
+    SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
+
+    TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
+        segmentGenerationMetrics,
+        realMeters,
+        createMetricEventBuilder()
+    );
+
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    monitor.doMonitor(emitter);
+
+    long firstCallNullCount = 0;
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents("ingest/events/thrownAway")) {
+      if ("null".equals(event.getUserDims().get("reason"))) {
+        firstCallNullCount = event.getValue().longValue();
+      }
+    }
+    Assert.assertEquals(2, firstCallNullCount);
+
+    emitter.flush();
+    realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    monitor.doMonitor(emitter);
+
+    // Find counts from second call - should be deltas only
+    Map<String, Long> secondCallCounts = new HashMap<>();
+    for (ServiceMetricEvent event : 
emitter.getMetricEvents("ingest/events/thrownAway")) {
+      Object reason = event.getUserDims().get("reason");
+      secondCallCounts.put(reason.toString(), event.getValue().longValue());
+    }
 
-    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/minMessageGap"));
-    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap"));
-    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+    // Should emit only the delta (1 more NULL, 2 new FILTERED)
+    Assert.assertEquals(Long.valueOf(1), secondCallCounts.get("null"));
+    Assert.assertEquals(Long.valueOf(2), secondCallCounts.get("filtered"));
   }
 
   private ServiceMetricEvent.Builder createMetricEventBuilder()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
new file mode 100644
index 00000000000..d9227670000
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.stats;
+
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class DropwizardRowIngestionMetersTest
+{
+  @Test
+  public void testBasicIncrements()
+  {
+    DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+    meters.incrementProcessed();
+    meters.incrementProcessedBytes(100);
+    meters.incrementProcessedWithError();
+    meters.incrementUnparseable();
+    meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+
+    Assert.assertEquals(1, meters.getProcessed());
+    Assert.assertEquals(100, meters.getProcessedBytes());
+    Assert.assertEquals(1, meters.getProcessedWithError());
+    Assert.assertEquals(1, meters.getUnparseable());
+    Assert.assertEquals(1, meters.getThrownAway());
+
+    RowIngestionMetersTotals totals = meters.getTotals();
+    Assert.assertEquals(1, totals.getProcessed());
+    Assert.assertEquals(100, totals.getProcessedBytes());
+    Assert.assertEquals(1, totals.getProcessedWithError());
+    Assert.assertEquals(1, totals.getUnparseable());
+    Assert.assertEquals(1, totals.getThrownAway());
+  }
+
+  @Test
+  public void testIncrementThrownAwayWithReason()
+  {
+    DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+    meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    meters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+    meters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+    meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+    // Total thrownAway should be sum of all reasons
+    Assert.assertEquals(7, meters.getThrownAway());
+
+    // Check per-reason counts
+    Map<String, Long> byReason = meters.getThrownAwayByReason();
+    Assert.assertEquals(Long.valueOf(2), 
byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(3), 
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+  }
+
+  @Test
+  public void testGetThrownAwayByReasonReturnsAllReasons()
+  {
+    DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+    // With no increments, all reasons should be present with 0 counts
+    Map<String, Long> byReason = meters.getThrownAwayByReason();
+    Assert.assertTrue(byReason.isEmpty());
+  }
+
+  @Test
+  public void testMovingAverages()
+  {
+    DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
+
+    meters.incrementProcessed();
+    meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+    Map<String, Object> movingAverages = meters.getMovingAverages();
+    Assert.assertNotNull(movingAverages);
+    
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.ONE_MINUTE_NAME));
+    
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME));
+    
Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME));
+  }
+}
+
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
index 2b4ffe26f31..e037d9adf72 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -78,7 +80,7 @@ public class FilteringCloseableInputRowIteratorTest
     final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 
10;
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         CloseableIterators.withEmptyBaggage(ROWS.iterator()),
-        filter,
+        InputRowFilter.fromPredicate(filter),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -125,7 +127,7 @@ public class FilteringCloseableInputRowIteratorTest
 
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         parseExceptionThrowingIterator,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -163,7 +165,7 @@ public class FilteringCloseableInputRowIteratorTest
 
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         parseExceptionThrowingIterator,
-        filter,
+        InputRowFilter.fromPredicate(filter),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -214,7 +216,7 @@ public class FilteringCloseableInputRowIteratorTest
 
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         parseExceptionThrowingIterator,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -260,7 +262,7 @@ public class FilteringCloseableInputRowIteratorTest
 
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         parseExceptionThrowingIterator,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -281,7 +283,7 @@ public class FilteringCloseableInputRowIteratorTest
     );
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         delegate,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -330,7 +332,7 @@ public class FilteringCloseableInputRowIteratorTest
 
     final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
         parseExceptionThrowingIterator,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -350,6 +352,103 @@ public class FilteringCloseableInputRowIteratorTest
   }
 
 
+  @Test
+  public void testRowFilterWithReasons()
+  {
+    // RowFilter that returns different reasons based on dim1 value
+    final InputRowFilter rowFilter = row -> {
+      int dim1 = (Integer) row.getRaw("dim1");
+      if (dim1 == 10) {
+        return InputRowFilterResult.ACCEPTED;
+      } else if (dim1 == 20) {
+        return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+      } else {
+        return InputRowFilterResult.CUSTOM_FILTER;
+      }
+    };
+
+    final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
+        CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+        rowFilter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+
+    // Only rows with dim1=10 should pass
+    Assert.assertEquals(4, filteredRows.size());
+    for (InputRow row : filteredRows) {
+      Assert.assertEquals(10, row.getRaw("dim1"));
+    }
+
+    // Check total thrown away
+    Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+    // Check per-reason counts
+    Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(2, byReason.size());
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); // 
dim1=20
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); // dim1=30
+  }
+
+  @Test
+  public void testRowFilterFromPredicate()
+  {
+    // Use the static helper to convert a Predicate to RowFilter
+    final Predicate<InputRow> predicate = row -> (Integer) row.getRaw("dim1") 
== 10;
+    final InputRowFilter rowFilter = InputRowFilter.fromPredicate(predicate);
+
+    final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
+        CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+        rowFilter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+
+    Assert.assertEquals(4, filteredRows.size());
+    Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+    // All thrown away should have FILTERED reason when using fromPredicate
+    Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(1, byReason.size());
+    Assert.assertEquals(Long.valueOf(2), 
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+  }
+
+  @Test
+  public void testRowFilterAnd()
+  {
+    // First filter: reject nulls (simulated by checking dim1)
+    final InputRowFilter nullFilter = row -> row == null ? 
InputRowFilterResult.NULL_OR_EMPTY_RECORD : InputRowFilterResult.ACCEPTED;
+
+    // Second filter: reject if dim1 != 10
+    final InputRowFilter valueFilter = row -> (Integer) row.getRaw("dim1") == 
10 ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER;
+
+    // Combine filters
+    final InputRowFilter combinedFilter = nullFilter.and(valueFilter);
+
+    final FilteringCloseableInputRowIterator rowIterator = new 
FilteringCloseableInputRowIterator(
+        CloseableIterators.withEmptyBaggage(ROWS.iterator()),
+        combinedFilter,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+
+    Assert.assertEquals(4, filteredRows.size());
+    Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
+
+    // All rejected rows should have FILTERED reason (from second filter)
+    Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(Long.valueOf(2), 
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+  }
+
   private static InputRow newRow(DateTime timestamp, Object dim1Val, Object 
dim2Val)
   {
     return new MapBasedInputRow(timestamp, DIMENSIONS, ImmutableMap.of("dim1", 
dim1Val, "dim2", dim2Val));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 2d999110288..9a2eeb8c1a6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -85,6 +85,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.handoff.NoopSegmentHandoffNotifierFactory;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import 
org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
@@ -1508,6 +1509,8 @@ public class IndexTaskTest extends IngestionTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedThrownAwayByReason = 
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
         ImmutableMap.of(
@@ -1515,7 +1518,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 4,
             RowIngestionMeters.PROCESSED_BYTES, 657,
             RowIngestionMeters.UNPARSEABLE, 4,
-            RowIngestionMeters.THROWN_AWAY, 1
+            RowIngestionMeters.THROWN_AWAY, 1,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         ),
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1523,7 +1527,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 1,
             RowIngestionMeters.PROCESSED_BYTES, 657,
             RowIngestionMeters.UNPARSEABLE, 4,
-            RowIngestionMeters.THROWN_AWAY, 1
+            RowIngestionMeters.THROWN_AWAY, 1,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedThrownAwayByReason
         )
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1680,6 +1685,9 @@ public class IndexTaskTest extends IngestionTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
+    Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = 
Map.of();
+    Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = 
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
         ImmutableMap.of(
@@ -1687,7 +1695,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 0,
             RowIngestionMeters.PROCESSED_BYTES, 0,
             RowIngestionMeters.UNPARSEABLE, 0,
-            RowIngestionMeters.THROWN_AWAY, 0
+            RowIngestionMeters.THROWN_AWAY, 0,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedDeterminePartitionsThrownAwayByReason
         ),
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1695,7 +1704,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 1,
             RowIngestionMeters.PROCESSED_BYTES, 182,
             RowIngestionMeters.UNPARSEABLE, 3,
-            RowIngestionMeters.THROWN_AWAY, 1
+            RowIngestionMeters.THROWN_AWAY, 1,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedBuildSegmentsThrownAwayByReason
         )
     );
 
@@ -1790,6 +1800,8 @@ public class IndexTaskTest extends IngestionTestBase
 
     IngestionStatsAndErrors reportData = getTaskReportData();
 
+    Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = 
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
+    Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = Map.of();
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
         ImmutableMap.of(
@@ -1797,7 +1809,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 1,
             RowIngestionMeters.PROCESSED_BYTES, 182,
             RowIngestionMeters.UNPARSEABLE, 3,
-            RowIngestionMeters.THROWN_AWAY, 1
+            RowIngestionMeters.THROWN_AWAY, 1,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedDeterminePartitionsThrownAwayByReason
         ),
         RowIngestionMeters.BUILD_SEGMENTS,
         ImmutableMap.of(
@@ -1805,7 +1818,8 @@ public class IndexTaskTest extends IngestionTestBase
             RowIngestionMeters.PROCESSED, 0,
             RowIngestionMeters.PROCESSED_BYTES, 0,
             RowIngestionMeters.UNPARSEABLE, 0,
-            RowIngestionMeters.THROWN_AWAY, 0
+            RowIngestionMeters.THROWN_AWAY, 0,
+            RowIngestionMeters.THROWN_AWAY_BY_REASON, 
expectedBuildSegmentsThrownAwayByReason
         )
     );
 
@@ -2738,7 +2752,10 @@ public class IndexTaskTest extends IngestionTestBase
       Map<String, AggregatorFactory> aggregatorFactoryMap
   )
   {
-    Assert.assertEquals(segmentWithSchemas.getSegments().size(), 
segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size());
+    Assert.assertEquals(
+        segmentWithSchemas.getSegments().size(),
+        
segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()
+    );
     Assert.assertEquals(1, 
segmentWithSchemas.getSegmentSchemaMapping().getSchemaFingerprintToPayloadMap().size());
     Assert.assertEquals(
         actualRowSignature,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
new file mode 100644
index 00000000000..53616c974e7
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class InputRowFilterTest
+{
+  private static final List<String> DIMENSIONS = ImmutableList.of("dim1");
+
+  @Test
+  public void test_fromPredicate_whichAllowsAll()
+  {
+    InputRowFilter filter = InputRowFilter.fromPredicate(row -> true);
+    InputRow row = newRow(100);
+
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, filter.test(row));
+  }
+
+  @Test
+  public void testFromPredicateReject()
+  {
+    InputRowFilter filter = InputRowFilter.fromPredicate(row -> false);
+    InputRow row = newRow(100);
+
+    Assert.assertEquals(InputRowFilterResult.CUSTOM_FILTER, filter.test(row));
+  }
+
+  @Test
+  public void testAndBothAccept()
+  {
+    InputRowFilter filter1 = InputRowFilter.allowAll();
+    InputRowFilter filter2 = InputRowFilter.allowAll();
+    InputRowFilter combined = filter1.and(filter2);
+
+    InputRow row = newRow(100);
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, combined.test(row));
+  }
+
+  @Test
+  public void testAndFirstRejects()
+  {
+    InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+    InputRowFilter filter2 = InputRowFilter.allowAll();
+    InputRowFilter combined = filter1.and(filter2);
+
+    InputRow row = newRow(100);
+    Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
combined.test(row));
+  }
+
+  @Test
+  public void testAndSecondRejects()
+  {
+    InputRowFilter filter1 = InputRowFilter.allowAll();
+    InputRowFilter filter2 = row -> 
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
+    InputRowFilter combined = filter1.and(filter2);
+
+    InputRow row = newRow(100);
+    Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 
combined.test(row));
+  }
+
+  @Test
+  public void testAndBothRejectReturnsFirst()
+  {
+    InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD;
+    InputRowFilter filter2 = row -> InputRowFilterResult.CUSTOM_FILTER;
+    InputRowFilter combined = filter1.and(filter2);
+
+    InputRow row = newRow(100);
+    // Should return reason from first filter
+    Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
combined.test(row));
+  }
+
+  @Test
+  public void testChainedAnd()
+  {
+    InputRowFilter filter1 = InputRowFilter.allowAll();
+    InputRowFilter filter2 = InputRowFilter.allowAll();
+    InputRowFilter filter3 = row -> 
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME;
+
+    InputRowFilter combined = filter1.and(filter2).and(filter3);
+
+    InputRow row = newRow(100);
+    Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 
combined.test(row));
+  }
+
+  private static InputRow newRow(Object dim1Val)
+  {
+    return new MapBasedInputRow(
+        DateTimes.of("2020-01-01"),
+        DIMENSIONS,
+        ImmutableMap.of("dim1", dim1Val)
+    );
+  }
+}
+
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index cc3cdffa747..a0e3f3245f2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -48,6 +48,7 @@ import 
org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.DataSegmentsWithSchemas;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -492,6 +493,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         Collections.emptyList()
     );
     TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
+    Map<String, Long> expectedThrownAwayByReason = 
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L);
     TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
         task.getId(),
         ImmutableList.of(
@@ -508,7 +510,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
                 1L
             )
         ),
-        new RowIngestionMetersTotals(10, 335, 1, 1, 1)
+        new RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1)
     );
     compareTaskReports(expectedReports, actualReports);
   }
@@ -543,7 +545,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
     final ParallelIndexSupervisorTask executedTask = 
(ParallelIndexSupervisorTask) taskContainer.getTask();
     TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
 
-    final RowIngestionMetersTotals expectedTotals = new 
RowIngestionMetersTotals(10, 335, 1, 1, 1);
+    Map<String, Long> expectedThrownAwayByReason = 
Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L);
+    final RowIngestionMetersTotals expectedTotals = new 
RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1);
     List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
         new ParseExceptionReport(
             "{ts=2017unparseable}",
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 7518bb7f172..eb08944d46b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
 import org.junit.Assert;
@@ -105,13 +106,13 @@ public class SeekableStreamIndexTaskRunnerTest
                                                                                
                LockGranularity.TIME_CHUNK);
 
     Mockito.when(row.getTimestamp()).thenReturn(now);
-    Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
 
     
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
-    Assert.assertFalse(runner.withinMinMaxRecordTime(row));
+    Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
 
     
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
-    Assert.assertFalse(runner.withinMinMaxRecordTime(row));
+    Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
   }
 
   @Test
@@ -157,13 +158,58 @@ public class SeekableStreamIndexTaskRunnerTest
     TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null,
                                                                                
                LockGranularity.TIME_CHUNK);
 
-    Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+    Mockito.when(row.getTimestamp()).thenReturn(now);
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
 
     
Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1));
-    Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
 
     
Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1));
-    Assert.assertTrue(runner.withinMinMaxRecordTime(row));
+    Assert.assertEquals(InputRowFilterResult.ACCEPTED, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
+  }
+
+  @Test
+  public void testEnsureRowRejectionReasonForNullRow()
+  {
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("d1"),
+            new StringDimensionSchema("d2")
+        )
+    );
+    DataSchema schema =
+        DataSchema.builder()
+                  .withDataSource("datasource")
+                  .withTimestamp(new TimestampSpec(null, null, null))
+                  .withDimensions(dimensionsSpec)
+                  .withGranularity(
+                      new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
+                  )
+                  .build();
+
+    SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
+    SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
+    SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = 
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
+
+    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
+    Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
+    Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
+    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
+    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
+    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
+
+    
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
+    Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
+
+    Mockito.when(task.getDataSchema()).thenReturn(schema);
+    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
+    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+
+    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(task, null,
+                                                                               
                LockGranularity.TIME_CHUNK);
+
+    Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(null));
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 649c0d57e46..92ed5785abb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.InputRowFilter;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.RE;
@@ -104,7 +105,7 @@ public class StreamChunkParserTest
         null,
         null,
         null,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -121,7 +122,7 @@ public class StreamChunkParserTest
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -139,7 +140,7 @@ public class StreamChunkParserTest
         null,
         null,
         null,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -169,7 +170,7 @@ public class StreamChunkParserTest
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -190,7 +191,7 @@ public class StreamChunkParserTest
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -213,7 +214,7 @@ public class StreamChunkParserTest
         new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, 
ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -241,7 +242,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         parser,
         mockedByteEntityReader,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         new ParseExceptionHandler(
             rowIngestionMeters,
@@ -279,7 +280,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         parser,
         mockedByteEntityReader,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         parseExceptionHandler
     );
@@ -318,7 +319,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         parser,
         mockedByteEntityReader,
-        row -> true,
+        InputRowFilter.allowAll(),
         rowIngestionMeters,
         new ParseExceptionHandler(
             rowIngestionMeters,
@@ -354,7 +355,7 @@ public class StreamChunkParserTest
         () -> new StreamChunkParser<>(
             null,
             null,
-            row -> true,
+            InputRowFilter.allowAll(),
             rowIngestionMeters,
             parseExceptionHandler
         )
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index 578d792df49..532553aef6e 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -57,6 +57,7 @@ public class DruidMetrics
   public static final String STREAM = "stream";
   public static final String PARTITION = "partition";
   public static final String SUPERVISOR_ID = "supervisorId";
+  public static final String REASON = "reason";
 
   public static final String TAGS = "tags";
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
new file mode 100644
index 00000000000..fcb4d8aec11
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Arrays;
+
+/**
+ * Result of filtering an input row during ingestion.
+ */
+public enum InputRowFilterResult
+{
+  /**
+   * The row passed the filter and should be processed.
+   */
+  ACCEPTED("accepted"),
+  /**
+   * The row was null or the input record was empty.
+   */
+  NULL_OR_EMPTY_RECORD("null"),
+
+  /**
+   * The row's timestamp is before the minimum message time (late message 
rejection).
+   */
+  BEFORE_MIN_MESSAGE_TIME("beforeMinimumMessageTime"),
+
+  /**
+   * The row's timestamp is after the maximum message time (early message 
rejection).
+   */
+  AFTER_MAX_MESSAGE_TIME("afterMaximumMessageTime"),
+
+  /**
+   * The row was filtered out by a transformSpec filter or other row filter.
+   */
+  CUSTOM_FILTER("filtered"),
+
+  /**
+   * A backwards-compatible value for tracking filter reasons for ingestion 
tasks using older Druid versions without filter reason tracking.
+   */
+  UNKNOWN("unknown");
+
+  private static final InputRowFilterResult[] REJECTED_VALUES = 
Arrays.stream(InputRowFilterResult.values())
+                                                                      
.filter(InputRowFilterResult::isRejected)
+                                                                      
.toArray(InputRowFilterResult[]::new);
+
+  private final String reason;
+
+  InputRowFilterResult(String reason)
+  {
+    this.reason = reason;
+  }
+
+  /**
+   * Returns string value representation of this {@link InputRowFilterResult} 
for metric emission.
+   */
+  public String getReason()
+  {
+    return reason;
+  }
+
+  /**
+   * Returns true if this result indicates the row was rejected (thrown away).
+   * Returns false for {@link #ACCEPTED}.
+   */
+  public boolean isRejected()
+  {
+    return this != ACCEPTED;
+  }
+
+  /**
+   * Returns {@link InputRowFilterResult} that are rejection states.
+   */
+  public static InputRowFilterResult[] rejectedValues()
+  {
+    return REJECTED_VALUES;
+  }
+
+  /**
+   * Returns total number of {@link InputRowFilterResult} values.
+   */
+  public static int numValues()
+  {
+    return values().length;
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
index bff4f2e6de3..67eb80a2dc8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
@@ -23,13 +23,13 @@ import java.util.Collections;
 import java.util.Map;
 
 /**
- * This class is used only in {@code RealtimeIndexTask} which is deprecated 
now.
+ * This class is used only in {@code DartFrameContext}.
  *
  * Consider using {@link RowIngestionMetersFactory} instead.
  */
 public class NoopRowIngestionMeters implements RowIngestionMeters
 {
-  private static final RowIngestionMetersTotals EMPTY_TOTALS = new 
RowIngestionMetersTotals(0, 0, 0, 0, 0);
+  private static final RowIngestionMetersTotals EMPTY_TOTALS = new 
RowIngestionMetersTotals(0, 0, 0, Map.of(), 0);
 
   @Override
   public long getProcessed()
@@ -74,11 +74,17 @@ public class NoopRowIngestionMeters implements 
RowIngestionMeters
   }
 
   @Override
-  public void incrementThrownAway()
+  public void incrementThrownAway(InputRowFilterResult reason)
   {
 
   }
 
+  @Override
+  public Map<String, Long> getThrownAwayByReason()
+  {
+    return Map.of();
+  }
+
   @Override
   public RowIngestionMetersTotals getTotals()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 25fc3bae481..502c2057ebc 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -40,6 +40,7 @@ public interface RowIngestionMeters extends InputStats
   String PROCESSED_WITH_ERROR = "processedWithError";
   String UNPARSEABLE = "unparseable";
   String THROWN_AWAY = "thrownAway";
+  String THROWN_AWAY_BY_REASON = "thrownAwayByReason";
 
   /**
    * Number of bytes read by an ingestion task.
@@ -73,7 +74,17 @@ public interface RowIngestionMeters extends InputStats
   void incrementUnparseable();
 
   long getThrownAway();
-  void incrementThrownAway();
+
+  /**
+   * Increments the thrown away counter for the specified {@link 
InputRowFilterResult} reason.
+   */
+  void incrementThrownAway(InputRowFilterResult reason);
+
+  /**
+   * Returns the count of thrown away events for each reason.
+   * Keyed by {@link InputRowFilterResult#getReason()}.
+   */
+  Map<String, Long> getThrownAwayByReason();
 
   RowIngestionMetersTotals getTotals();
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index 2002bb24ac0..43e1b985705 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -21,7 +21,11 @@ package org.apache.druid.segment.incremental;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
 
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 public class RowIngestionMetersTotals
@@ -30,6 +34,7 @@ public class RowIngestionMetersTotals
   private final long processedBytes;
   private final long processedWithError;
   private final long thrownAway;
+  private final Map<String, Long> thrownAwayByReason;
   private final long unparseable;
 
   @JsonCreator
@@ -38,13 +43,49 @@ public class RowIngestionMetersTotals
       @JsonProperty("processedBytes") long processedBytes,
       @JsonProperty("processedWithError") long processedWithError,
       @JsonProperty("thrownAway") long thrownAway,
+      @JsonProperty("thrownAwayByReason") @Nullable Map<String, Long> 
thrownAwayByReason,
       @JsonProperty("unparseable") long unparseable
   )
+  {
+    this(
+        processed,
+        processedBytes,
+        processedWithError,
+        Configs.valueOrDefault(thrownAwayByReason, 
getBackwardsCompatibleThrownAwayByReason(thrownAway)),
+        unparseable
+    );
+  }
+
+  public RowIngestionMetersTotals(
+      long processed,
+      long processedBytes,
+      long processedWithError,
+      long thrownAway,
+      long unparseable
+  )
+  {
+    this(
+        processed,
+        processedBytes,
+        processedWithError,
+        getBackwardsCompatibleThrownAwayByReason(thrownAway),
+        unparseable
+    );
+  }
+
+  public RowIngestionMetersTotals(
+      long processed,
+      long processedBytes,
+      long processedWithError,
+      Map<String, Long> thrownAwayByReason,
+      long unparseable
+  )
   {
     this.processed = processed;
     this.processedBytes = processedBytes;
     this.processedWithError = processedWithError;
-    this.thrownAway = thrownAway;
+    this.thrownAway = thrownAwayByReason.values().stream().reduce(0L, 
Long::sum);
+    this.thrownAwayByReason = thrownAwayByReason;
     this.unparseable = unparseable;
   }
 
@@ -72,6 +113,12 @@ public class RowIngestionMetersTotals
     return thrownAway;
   }
 
+  @JsonProperty
+  public Map<String, Long> getThrownAwayByReason()
+  {
+    return thrownAwayByReason;
+  }
+
   @JsonProperty
   public long getUnparseable()
   {
@@ -92,13 +139,14 @@ public class RowIngestionMetersTotals
            && processedBytes == that.processedBytes
            && processedWithError == that.processedWithError
            && thrownAway == that.thrownAway
+           && thrownAwayByReason.equals(that.thrownAwayByReason)
            && unparseable == that.unparseable;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(processed, processedBytes, processedWithError, 
thrownAway, unparseable);
+    return Objects.hash(processed, processedBytes, processedWithError, 
thrownAway, thrownAwayByReason, unparseable);
   }
 
   @Override
@@ -109,7 +157,21 @@ public class RowIngestionMetersTotals
            ", processedBytes=" + processedBytes +
            ", processedWithError=" + processedWithError +
            ", thrownAway=" + thrownAway +
+           ", thrownAwayByReason=" + thrownAwayByReason +
            ", unparseable=" + unparseable +
            '}';
   }
+
+  /**
+   * For backwards compatibility, key by {@link InputRowFilterResult} in case 
of lack of thrownAwayByReason input during rolling Druid upgrades.
+   * This can occur when tasks running on older Druid versions return ingest 
statistic payloads to an overlord running on a newer Druid version.
+   */
+  private static Map<String, Long> 
getBackwardsCompatibleThrownAwayByReason(long thrownAway)
+  {
+    Map<String, Long> results = new HashMap<>();
+    if (thrownAway > 0) {
+      results.put(InputRowFilterResult.UNKNOWN.getReason(), thrownAway);
+    }
+    return results;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
index 10293e4e24a..2140b781241 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.incremental;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class SimpleRowIngestionMeters implements RowIngestionMeters
@@ -26,8 +27,8 @@ public class SimpleRowIngestionMeters implements 
RowIngestionMeters
   private long processed;
   private long processedWithError;
   private long unparseable;
-  private long thrownAway;
   private long processedBytes;
+  private final long[] thrownAwayByReason = new 
long[InputRowFilterResult.numValues()];
 
   @Override
   public long getProcessed()
@@ -80,13 +81,30 @@ public class SimpleRowIngestionMeters implements 
RowIngestionMeters
   @Override
   public long getThrownAway()
   {
-    return thrownAway;
+    long total = 0;
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      total += thrownAwayByReason[reason.ordinal()];
+    }
+    return total;
   }
 
   @Override
-  public void incrementThrownAway()
+  public void incrementThrownAway(InputRowFilterResult reason)
   {
-    thrownAway++;
+    ++thrownAwayByReason[reason.ordinal()];
+  }
+
+  @Override
+  public Map<String, Long> getThrownAwayByReason()
+  {
+    final Map<String, Long> result = new HashMap<>();
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      long count = thrownAwayByReason[reason.ordinal()];
+      if (count > 0) {
+        result.put(reason.getReason(), count);
+      }
+    }
+    return result;
   }
 
   @Override
@@ -96,7 +114,7 @@ public class SimpleRowIngestionMeters implements 
RowIngestionMeters
         processed,
         processedBytes,
         processedWithError,
-        thrownAway,
+        getThrownAwayByReason(),
         unparseable
     );
   }
@@ -112,7 +130,11 @@ public class SimpleRowIngestionMeters implements 
RowIngestionMeters
     this.processed += rowIngestionMetersTotals.getProcessed();
     this.processedWithError += 
rowIngestionMetersTotals.getProcessedWithError();
     this.unparseable += rowIngestionMetersTotals.getUnparseable();
-    this.thrownAway += rowIngestionMetersTotals.getThrownAway();
     this.processedBytes += rowIngestionMetersTotals.getProcessedBytes();
+
+    final Map<String, Long> thrownAwayByReason = 
rowIngestionMetersTotals.getThrownAwayByReason();
+    for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
+      this.thrownAwayByReason[reason.ordinal()] += 
thrownAwayByReason.getOrDefault(reason.getReason(), 0L);
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
index 4ff36d731df..b137ffe1f35 100644
--- 
a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.utils.CollectionUtils;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -353,6 +354,7 @@ public class TaskReportSerdeTest
             "processedBytes", (int) 
determinePartitionTotalStats.getProcessedBytes(),
             "processedWithError", (int) 
determinePartitionTotalStats.getProcessedWithError(),
             "thrownAway", (int) determinePartitionTotalStats.getThrownAway(),
+            "thrownAwayByReason", 
CollectionUtils.mapValues(determinePartitionTotalStats.getThrownAwayByReason(), 
Long::intValue),
             "unparseable", (int) determinePartitionTotalStats.getUnparseable()
         ),
         observedTotals.get("determinePartitions")
@@ -363,6 +365,7 @@ public class TaskReportSerdeTest
             "processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(),
             "processedWithError", (int) 
buildSegmentTotalStats.getProcessedWithError(),
             "thrownAway", (int) buildSegmentTotalStats.getThrownAway(),
+            "thrownAwayByReason", 
CollectionUtils.mapValues(buildSegmentTotalStats.getThrownAwayByReason(), 
Long::intValue),
             "unparseable", (int) buildSegmentTotalStats.getUnparseable()
         ),
         observedTotals.get("buildSegments")
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
new file mode 100644
index 00000000000..a8d932e4dcd
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputRowFilterResultTest
+{
+  @Test
+  public void testOrdinalValues()
+  {
+    Assert.assertEquals(0, InputRowFilterResult.ACCEPTED.ordinal());
+    Assert.assertEquals(1, 
InputRowFilterResult.NULL_OR_EMPTY_RECORD.ordinal());
+    Assert.assertEquals(2, 
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.ordinal());
+    Assert.assertEquals(3, 
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.ordinal());
+    Assert.assertEquals(4, InputRowFilterResult.CUSTOM_FILTER.ordinal());
+  }
+
+  @Test
+  public void testMetricValues()
+  {
+    Assert.assertEquals("accepted", InputRowFilterResult.ACCEPTED.getReason());
+    Assert.assertEquals("null", 
InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason());
+    Assert.assertEquals("beforeMinimumMessageTime", 
InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason());
+    Assert.assertEquals("afterMaximumMessageTime", 
InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason());
+    Assert.assertEquals("filtered", 
InputRowFilterResult.CUSTOM_FILTER.getReason());
+  }
+
+  @Test
+  public void testEnumCardinality()
+  {
+    Assert.assertEquals(6, InputRowFilterResult.values().length);
+  }
+
+  @Test
+  public void testIsRejected()
+  {
+    Assert.assertFalse(InputRowFilterResult.ACCEPTED.isRejected());
+    Assert.assertTrue(InputRowFilterResult.NULL_OR_EMPTY_RECORD.isRejected());
+    
Assert.assertTrue(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.isRejected());
+    
Assert.assertTrue(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.isRejected());
+    Assert.assertTrue(InputRowFilterResult.CUSTOM_FILTER.isRejected());
+    Assert.assertTrue(InputRowFilterResult.UNKNOWN.isRejected());
+  }
+}
+
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java 
b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
index 02b1d290003..f155e70424d 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.segment.incremental;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Utility class to build {@link RowIngestionMetersTotals}, used in tests.
  */
@@ -27,7 +30,7 @@ public class RowMeters
   private long processedBytes;
   private long processedWithError;
   private long unparseable;
-  private long thrownAway;
+  private final Map<String, Long> thrownAwayByReason = new HashMap<>();
 
   /**
    * Creates a new {@link RowMeters}, that can be used to build an instance of
@@ -56,14 +59,20 @@ public class RowMeters
     return this;
   }
 
+  public RowMeters thrownAwayByReason(InputRowFilterResult thrownAwayByReason, 
long thrownAway)
+  {
+    this.thrownAwayByReason.put(thrownAwayByReason.getReason(), thrownAway);
+    return this;
+  }
+
   public RowMeters thrownAway(long thrownAway)
   {
-    this.thrownAway = thrownAway;
+    this.thrownAwayByReason.put(InputRowFilterResult.UNKNOWN.getReason(), 
thrownAway);
     return this;
   }
 
   public RowIngestionMetersTotals totalProcessed(long processed)
   {
-    return new RowIngestionMetersTotals(processed, processedBytes, 
processedWithError, thrownAway, unparseable);
+    return new RowIngestionMetersTotals(processed, processedBytes, 
processedWithError, thrownAwayByReason, unparseable);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
index 5f46129c170..9cccff0c880 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class SimpleRowIngestionMetersTest
 {
   @Test
@@ -32,16 +34,49 @@ public class SimpleRowIngestionMetersTest
     rowIngestionMeters.incrementProcessedBytes(5);
     rowIngestionMeters.incrementProcessedWithError();
     rowIngestionMeters.incrementUnparseable();
-    rowIngestionMeters.incrementThrownAway();
-    Assert.assertEquals(rowIngestionMeters.getTotals(), new 
RowIngestionMetersTotals(1, 5, 1, 1, 1));
+    
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    final Map<String, Long> expected = 
Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1L);
+    Assert.assertEquals(new RowIngestionMetersTotals(1, 5, 1, expected, 1), 
rowIngestionMeters.getTotals());
   }
 
   @Test
   public void testAddRowIngestionMetersTotals()
   {
     SimpleRowIngestionMeters rowIngestionMeters = new 
SimpleRowIngestionMeters();
-    RowIngestionMetersTotals rowIngestionMetersTotals = new 
RowIngestionMetersTotals(10, 0, 1, 0, 1);
+    RowIngestionMetersTotals rowIngestionMetersTotals = new 
RowIngestionMetersTotals(10, 0, 1, 1, 1);
     rowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    Assert.assertEquals(rowIngestionMeters.getTotals(), 
rowIngestionMetersTotals);
+    Assert.assertEquals(rowIngestionMetersTotals, 
rowIngestionMeters.getTotals());
+  }
+
+  @Test
+  public void testIncrementThrownAwayWithReason()
+  {
+    SimpleRowIngestionMeters rowIngestionMeters = new 
SimpleRowIngestionMeters();
+
+    
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
+    
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
+    
rowIngestionMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
+    rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+    rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
+
+    Assert.assertEquals(7, rowIngestionMeters.getThrownAway());
+
+    Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertEquals(Long.valueOf(2), 
byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(1), 
byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
+    Assert.assertEquals(Long.valueOf(3), 
byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
+  }
+
+  @Test
+  public void testGetThrownAwayByReasonReturnsNoRejectedReasons()
+  {
+    SimpleRowIngestionMeters rowIngestionMeters = new 
SimpleRowIngestionMeters();
+
+    // With no increments, no rejected reasons should be present
+    Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
+    Assert.assertTrue(byReason.isEmpty());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to