kfaraz commented on code in PR #18855:
URL: https://github.com/apache/druid/pull/18855#discussion_r2633804227
##########
docs/operations/metrics.md:
##########
@@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/events/processed`|Number of events processed per emission
period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the
number of events per emission period.|
|`ingest/events/processedWithError`|Number of events processed with some
partial errors per emission period. Events processed with partial errors are
counted towards both this metric and `ingest/events/processed`.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
-|`ingest/events/thrownAway`|Number of events rejected because they are null,
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`,
`earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`,
`tags`|0|
+|`ingest/events/thrownAway`|Number of events rejected because they are null,
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`,
`earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event
was thrown away.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`,
`reason`|0|
Review Comment:
I would advise using the `description` dimension instead, since it is pretty
much a free form dimension already baked into Druid, and suits this use case
well.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java:
##########
@@ -25,21 +25,27 @@
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.ThrownAwayReason;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
+import java.util.EnumMap;
+import java.util.Map;
+
/**
* Emits metrics from {@link SegmentGenerationMetrics} and {@link
RowIngestionMeters}.
*/
public class TaskRealtimeMetricsMonitor extends AbstractMonitor
{
private static final EmittingLogger log = new
EmittingLogger(TaskRealtimeMetricsMonitor.class);
+ private static final String REASON_DIMENSION = "reason";
private final SegmentGenerationMetrics segmentGenerationMetrics;
private final RowIngestionMeters rowIngestionMeters;
private final ServiceMetricEvent.Builder builder;
private SegmentGenerationMetrics previousSegmentGenerationMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
+ private Map<ThrownAwayReason, Long> previousThrownAwayByReason;
Review Comment:
I feel it would be cleaner to have this map live inside
`RowIngestionMetersTotals` itself, since it already captures all the
aggregates, it might as well contain the numbers by category.
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
Review Comment:
```suggestion
NULL_OR_EMPTY_RECORD,
```
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME,
+
+ /**
+ * The row's timestamp is after the maximum message time (early message
rejection).
+ */
+ AFTER_MAX_MESSAGE_TIME,
Review Comment:
```suggestion
AFTER_MAX_MESSAGE_TIME("Record after max message time"),
```
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME,
+
+ /**
+ * The row's timestamp is after the maximum message time (early message
rejection).
+ */
+ AFTER_MAX_MESSAGE_TIME,
+
+ /**
+ * The row was filtered out by a transformSpec filter or other row filter.
+ */
+ FILTERED;
+
+ /**
+ * Pre-computed metric dimension values, indexed by ordinal.
+ */
+ private static final String[] METRIC_VALUES = {
+ "null",
+ "beforeMinMessageTime",
+ "afterMaxMessageTime",
+ "filtered"
+ };
+
+ /**
+ * Returns the value to be used as the dimension value in metrics.
+ */
+ public String getMetricValue()
Review Comment:
```suggestion
public String emittedDimensionValue()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -2144,26 +2146,36 @@ private void refreshMinMaxMessageTime()
);
}
- public boolean withinMinMaxRecordTime(final InputRow row)
+ /**
+ * Returns the rejection reason for a row, or null if the row should be
accepted.
+ * This method is used as a {@link RowFilter} for the {@link
StreamChunkParser}.
+ */
+ @Nullable
+ ThrownAwayReason getRowRejectionReason(final InputRow row)
{
- final boolean beforeMinimumMessageTime =
minMessageTime.isAfter(row.getTimestamp());
- final boolean afterMaximumMessageTime =
maxMessageTime.isBefore(row.getTimestamp());
-
- if (log.isDebugEnabled()) {
- if (beforeMinimumMessageTime) {
+ if (row == null) {
+ return ThrownAwayReason.NULL;
+ }
+ if (minMessageTime.isAfter(row.getTimestamp())) {
Review Comment:
Cleaner to chain these using `else`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/RowFilter.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.incremental.ThrownAwayReason;
+
+import javax.annotation.Nullable;
+import java.util.function.Predicate;
+
+/**
+ * A filter for input rows during ingestion that can report the reason for
rejection or null for acceptance.
+ * This is similar to {@link Predicate} but returns the rejection reason
instead of just a boolean.
+ */
+@FunctionalInterface
+public interface RowFilter
Review Comment:
```suggestion
public interface InputRowFilter
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/RowFilterTest.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.incremental.ThrownAwayReason;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class RowFilterTest
+{
+ private static final List<String> DIMENSIONS = ImmutableList.of("dim1");
+
+ @Test
+ public void testFromPredicateAccept()
Review Comment:
Nit: Different style which makes the tested method name more apparent, but
it is okay if you prefer the current style itself.
```suggestion
public void test_fromPredicate_whichAllowsAll()
```
##########
processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java:
##########
@@ -73,7 +73,16 @@ default long getProcessedBytes()
void incrementUnparseable();
long getThrownAway();
- void incrementThrownAway();
+
+ /**
+ * Increments the thrown away counter for the specified reason.
+ */
+ void incrementThrownAway(ThrownAwayReason reason);
+
+ /**
+ * Returns the count of thrown away events for each reason.
+ */
+ Map<ThrownAwayReason, Long> getThrownAwayByReason();
Review Comment:
Is this method used anywhere directly?
I feel it should just be included inside the result of `getTotals()` and
need not be exposed as a separate map.
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME,
+
+ /**
+ * The row's timestamp is after the maximum message time (early message
rejection).
+ */
+ AFTER_MAX_MESSAGE_TIME,
+
+ /**
+ * The row was filtered out by a transformSpec filter or other row filter.
+ */
+ FILTERED;
Review Comment:
```suggestion
FILTERED("Filtered by transform spec etc");
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/RowFilter.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.incremental.ThrownAwayReason;
+
+import javax.annotation.Nullable;
+import java.util.function.Predicate;
+
+/**
+ * A filter for input rows during ingestion that can report the reason for
rejection or null for acceptance.
+ * This is similar to {@link Predicate} but returns the rejection reason
instead of just a boolean.
+ */
+@FunctionalInterface
+public interface RowFilter
+{
+ /**
+ * Tests whether the given row should be accepted.
+ *
+ * @param row the input row to test
+ * @return null if the row should be accepted, or the {@link
ThrownAwayReason} if the row should be rejected
+ */
+ @Nullable
+ ThrownAwayReason test(InputRow row);
Review Comment:
Would this be cleaner?
```suggestion
RowFilterTestResult test(InputRow row);
```
where `RowFilterTestResult` contains a boolean `isRejected` and a `reason`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/RowFilter.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.segment.incremental.ThrownAwayReason;
+
+import javax.annotation.Nullable;
+import java.util.function.Predicate;
+
+/**
+ * A filter for input rows during ingestion that can report the reason for
rejection or null for acceptance.
+ * This is similar to {@link Predicate} but returns the rejection reason
instead of just a boolean.
+ */
+@FunctionalInterface
+public interface RowFilter
+{
+ /**
+ * Tests whether the given row should be accepted.
+ *
+ * @param row the input row to test
+ * @return null if the row should be accepted, or the {@link
ThrownAwayReason} if the row should be rejected
+ */
+ @Nullable
+ ThrownAwayReason test(InputRow row);
+
+ /**
+ * Creates a {@link RowFilter} from a Predicate. When the predicate returns
false,
+ * the rejection reason will be {@link ThrownAwayReason#FILTERED}.
+ */
+ static RowFilter fromPredicate(Predicate<InputRow> predicate)
+ {
+ return row -> predicate.test(row) ? null : ThrownAwayReason.FILTERED;
+ }
+
+ /**
+ * Fully-permissive {@link RowFilter} used mainly for tests.
+ */
+ static RowFilter allow()
Review Comment:
```suggestion
static RowFilter allowAll()
```
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
Review Comment:
Should we align this enum name with `InputRowFilter`, e.g. `RowFilterReason`?
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME,
Review Comment:
```suggestion
BEFORE_MIN_MESSAGE_TIME("Record before min message time"),
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -2144,26 +2146,36 @@ private void refreshMinMaxMessageTime()
);
}
- public boolean withinMinMaxRecordTime(final InputRow row)
+ /**
+ * Returns the rejection reason for a row, or null if the row should be
accepted.
+ * This method is used as a {@link RowFilter} for the {@link
StreamChunkParser}.
+ */
+ @Nullable
+ ThrownAwayReason getRowRejectionReason(final InputRow row)
Review Comment:
The method name should reflect when it rejects something.
```suggestion
ThrownAwayReason ensureRowIsNonNullAndWithinMessageTimeBounds(final
InputRow row)
```
##########
processing/src/main/java/org/apache/druid/segment/incremental/ThrownAwayReason.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+/**
+ * Reasons why an input row may be thrown away during ingestion.
+ */
+public enum ThrownAwayReason
+{
+ /**
+ * The row was null or the input record was empty.
+ */
+ NULL,
+
+ /**
+ * The row's timestamp is before the minimum message time (late message
rejection).
+ */
+ BEFORE_MIN_MESSAGE_TIME,
+
+ /**
+ * The row's timestamp is after the maximum message time (early message
rejection).
+ */
+ AFTER_MAX_MESSAGE_TIME,
+
+ /**
+ * The row was filtered out by a transformSpec filter or other row filter.
+ */
+ FILTERED;
+
+ /**
+ * Pre-computed metric dimension values, indexed by ordinal.
+ */
+ private static final String[] METRIC_VALUES = {
+ "null",
+ "beforeMinMessageTime",
+ "afterMaxMessageTime",
+ "filtered"
+ };
Review Comment:
The dimension name should be passed inside the constructor of each enum
category.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]