This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bac5ef347c Add ingest/input/bytes metric and Kafka consumer metrics.
(#14582)
bac5ef347c is described below
commit bac5ef347ceaedcc202bc842aa58a9ee64d8f890
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jul 19 19:56:22 2023 -0700
Add ingest/input/bytes metric and Kafka consumer metrics. (#14582)
* Add ingest/input/bytes metric and Kafka consumer metrics.
New metrics:
1) ingest/input/bytes. Equivalent to processedBytes in the task reports.
2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
metric "bytes-consumed-total". Only emitted for Kafka tasks.
3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
metric "records-consumed-total". Only emitted for Kafka tasks.
* Fix anchor.
* Fix KafkaConsumerMonitor.
* Interface updates.
* Doc changes.
* Update
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
Co-authored-by: Benedict Jin <[email protected]>
---------
Co-authored-by: Benedict Jin <[email protected]>
---
docs/operations/metrics.md | 6 +-
.../druid/indexing/kafka/KafkaConsumerMonitor.java | 94 ++++++++++
.../druid/indexing/kafka/KafkaIndexTask.java | 12 +-
.../druid/indexing/kafka/KafkaRecordSupplier.java | 13 ++
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 2 -
.../indexing/kafka/KafkaRecordSupplierTest.java | 192 +++++++++++----------
.../druid/indexing/kinesis/KinesisIndexTask.java | 2 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 3 +-
.../common/stats/TaskRealtimeMetricsMonitor.java | 6 +
.../seekablestream/SeekableStreamIndexTask.java | 22 ++-
.../SeekableStreamIndexTaskRunner.java | 4 +-
.../SeekableStreamIndexTaskRunnerAuthTest.java | 2 +-
.../SeekableStreamSupervisorStateTest.java | 3 +-
.../segment/incremental/RowIngestionMeters.java | 11 +-
14 files changed, 270 insertions(+), 102 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1c3b6817d6..b1922e3519 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -210,10 +210,12 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
-|`ingest/events/thrownAway`|Number of events rejected because they are either
null, or filtered by the transform spec, or outside the
windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/processed`|Number of events processed per emission
period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the
number of events per emission period.|
+|`ingest/events/processedWithError`|Number of events processed with some
partial errors per emission period. Events processed with partial errors are
counted towards both this metric and `ingest/events/processed`.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/thrownAway`|Number of events rejected because they are null,
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`,
`earlyMessageRejectionPeriod`, or `windowPeriod`.|`dataSource`, `taskId`,
`taskType`, `groupId`, `tags`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are
duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
-|`ingest/events/processed`|Number of events successfully processed per
emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to
the number of events per emission period.|
+|`ingest/input/bytes`|Number of bytes read from input sources, after
decompression but prior to parsing. This covers all data read, including data
that does not end up being fully processed and ingested. For example, this
includes data that ends up being rejected for being unparseable or filtered
out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
amount of data read.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`,
`taskType`, `groupId`|Your number of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate
persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on
configuration. Generally a few minutes at most.|
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
new file mode 100644
index 0000000000..dd10335045
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class KafkaConsumerMonitor extends AbstractMonitor
+{
+ private volatile boolean stopAfterNext = false;
+
+ // Kafka metric name -> Druid metric name
+ private static final Map<String, String> METRICS =
+ ImmutableMap.<String, String>builder()
+ .put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
+ .put("records-consumed-total",
"kafka/consumer/recordsConsumed")
+ .build();
+ private static final String TOPIC_TAG = "topic";
+ private static final Set<String> TOPIC_METRIC_TAGS =
ImmutableSet.of("client-id", TOPIC_TAG);
+
+ private final KafkaConsumer<?, ?> consumer;
+ private final Map<String, AtomicLong> counters = new HashMap<>();
+
+ public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public boolean doMonitor(final ServiceEmitter emitter)
+ {
+ for (final Map.Entry<MetricName, ? extends Metric> entry :
consumer.metrics().entrySet()) {
+ final MetricName metricName = entry.getKey();
+
+ if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName))
{
+ final String topic = metricName.tags().get(TOPIC_TAG);
+ final long newValue = ((Number)
entry.getValue().metricValue()).longValue();
+ final long priorValue =
+ counters.computeIfAbsent(metricName.name(), ignored -> new
AtomicLong())
+ .getAndSet(newValue);
+
+ if (newValue != priorValue) {
+ final ServiceMetricEvent.Builder builder =
+ new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
+ emitter.emit(builder.build(METRICS.get(metricName.name()), newValue
- priorValue));
+ }
+ }
+ }
+
+ return !stopAfterNext;
+ }
+
+ public void stopAfterNextEmit()
+ {
+ stopAfterNext = true;
+ }
+
+ private static boolean isTopicMetric(final MetricName metricName)
+ {
+ // Certain metrics are emitted both as grand totals and broken down by
topic; we want to ignore the grand total and
+ // only look at the per-topic metrics. See
https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
+ return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
+ && !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 72227a8474..14e1dbe276 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
+import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -97,7 +98,7 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<Integer, Long, Kafka
}
@Override
- protected KafkaRecordSupplier newTaskRecordSupplier()
+ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox
toolbox)
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
@@ -107,7 +108,14 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<Integer, Long, Kafka
props.put("auto.offset.reset", "none");
- return new KafkaRecordSupplier(props, configMapper,
kafkaIndexTaskIOConfig.getConfigOverrides());
+ final KafkaRecordSupplier recordSupplier =
+ new KafkaRecordSupplier(props, configMapper,
kafkaIndexTaskIOConfig.getConfigOverrides());
+
+ if (toolbox.getMonitorScheduler() != null) {
+ toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
+ }
+
+ return recordSupplier;
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index eb3833a0b6..ce5a901fd7 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -33,6 +33,7 @@ import
org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -60,6 +61,7 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplier implements RecordSupplier<Integer, Long,
KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
+ private final KafkaConsumerMonitor monitor;
private boolean closed;
public KafkaRecordSupplier(
@@ -77,6 +79,7 @@ public class KafkaRecordSupplier implements
RecordSupplier<Integer, Long, KafkaR
)
{
this.consumer = consumer;
+ this.monitor = new KafkaConsumerMonitor(consumer);
}
@Override
@@ -190,6 +193,14 @@ public class KafkaRecordSupplier implements
RecordSupplier<Integer, Long, KafkaR
});
}
+ /**
+ * Returns a Monitor that emits Kafka consumer metrics.
+ */
+ public Monitor monitor()
+ {
+ return monitor;
+ }
+
@Override
public void close()
{
@@ -197,6 +208,8 @@ public class KafkaRecordSupplier implements
RecordSupplier<Integer, Long, KafkaR
return;
}
closed = true;
+
+ monitor.stopAfterNextEmit();
consumer.close();
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 8ec664af89..f9fbeea298 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -76,7 +76,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -152,7 +151,6 @@ import java.util.stream.Stream;
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
- private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final long POLL_RETRY_MS = 100;
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new
Header()
{
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index b480ec146f..7a8b9b1514 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.Monitor;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.druid.segment.TestHelper;
@@ -59,15 +61,15 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplierTest
{
- private static String topic = "topic";
- private static String additonal_parameter = "additional.parameter";
- private static long poll_timeout_millis = 1000;
- private static int pollRetry = 5;
- private static int topicPosFix = 0;
+ private static final String ADDITIONAL_PARAMETER = "additional.parameter";
+ private static final long POLL_TIMEOUT_MILLIS = 1000;
+ private static final int POLL_RETRY = 5;
private static final ObjectMapper OBJECT_MAPPER =
TestHelper.makeJsonMapper();
- private static TestingCluster zkServer;
- private static TestBroker kafkaServer;
+ private static String TOPIC = "topic";
+ private static int TOPIC_POS_FIX = 0;
+ private static TestingCluster ZK_SERVER;
+ private static TestBroker KAFKA_SERVER;
private List<ProducerRecord<byte[], byte[]>> records;
@@ -112,9 +114,9 @@ public class KafkaRecordSupplierTest
}
}
- private static String getTopicName()
+ private static String nextTopicName()
{
- return "topic-" + topicPosFix++;
+ return "topic-" + TOPIC_POS_FIX++;
}
private List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
createOrderedPartitionableRecords()
@@ -129,7 +131,7 @@ public class KafkaRecordSupplierTest
partitionToOffset.put(r.partition(), 1L);
}
return new OrderedPartitionableRecord<>(
- topic,
+ TOPIC,
r.partition(),
offset,
r.value() == null ? null : Collections.singletonList(new
KafkaRecordEntity(
@@ -187,34 +189,34 @@ public class KafkaRecordSupplierTest
@BeforeClass
public static void setupClass() throws Exception
{
- zkServer = new TestingCluster(1);
- zkServer.start();
+ ZK_SERVER = new TestingCluster(1);
+ ZK_SERVER.start();
- kafkaServer = new TestBroker(
- zkServer.getConnectString(),
+ KAFKA_SERVER = new TestBroker(
+ ZK_SERVER.getConnectString(),
null,
1,
ImmutableMap.of("num.partitions", "2")
);
- kafkaServer.start();
+ KAFKA_SERVER.start();
}
@Before
public void setupTest()
{
- topic = getTopicName();
- records = generateRecords(topic);
+ TOPIC = nextTopicName();
+ records = generateRecords(TOPIC);
}
@AfterClass
public static void tearDownClass() throws Exception
{
- kafkaServer.close();
- kafkaServer = null;
+ KAFKA_SERVER.close();
+ KAFKA_SERVER = null;
- zkServer.stop();
- zkServer = null;
+ ZK_SERVER.stop();
+ ZK_SERVER = null;
}
@Test
@@ -225,19 +227,19 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
- Assert.assertEquals(ImmutableSet.of(0, 1),
recordSupplier.getPartitionIds(topic));
+ Assert.assertEquals(ImmutableSet.of(0, 1),
recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
}
@@ -250,11 +252,11 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
- Map<String, Object> properties = kafkaServer.consumerProperties();
+ Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
@@ -269,7 +271,7 @@ public class KafkaRecordSupplierTest
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
- Assert.assertEquals(ImmutableSet.of(0, 1),
recordSupplier.getPartitionIds(topic));
+ Assert.assertEquals(ImmutableSet.of(0, 1),
recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
}
@@ -279,10 +281,10 @@ public class KafkaRecordSupplierTest
public void testSupplierSetupCustomDeserializerRequiresParameter()
{
- Map<String, Object> properties = kafkaServer.consumerProperties();
+ Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
- properties.put(additonal_parameter, "stringValue");
+ properties.put(ADDITIONAL_PARAMETER, "stringValue");
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
@@ -298,7 +300,7 @@ public class KafkaRecordSupplierTest
public void
testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
{
- Map<String, Object> properties = kafkaServer.consumerProperties();
+ Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
@@ -320,11 +322,11 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
- Map<String, Object> properties = kafkaServer.consumerProperties();
+ Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer",
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
@@ -339,9 +341,10 @@ public class KafkaRecordSupplierTest
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(poll_timeout_millis);
- for (int i = 0; polledRecords.size() != initialRecords.size() && i <
pollRetry; i++) {
- polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(
+ POLL_TIMEOUT_MILLIS);
+ for (int i = 0; polledRecords.size() != initialRecords.size() && i <
POLL_RETRY; i++) {
+ polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@@ -360,24 +363,27 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(),
+ KAFKA_SERVER.consumerProperties(),
OBJECT_MAPPER,
null
);
+ final Monitor monitor = recordSupplier.monitor();
+ monitor.start();
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(poll_timeout_millis);
- for (int i = 0; polledRecords.size() != initialRecords.size() && i <
pollRetry; i++) {
- polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords =
+ recordSupplier.poll(POLL_TIMEOUT_MILLIS);
+ for (int i = 0; polledRecords.size() != initialRecords.size() && i <
POLL_RETRY; i++) {
+ polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@@ -385,7 +391,14 @@ public class KafkaRecordSupplierTest
Assert.assertEquals(initialRecords.size(), polledRecords.size());
Assert.assertTrue(initialRecords.containsAll(polledRecords));
+ // Verify metrics
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ Assert.assertTrue(monitor.monitor(emitter));
+ emitter.verifyEmitted("kafka/consumer/bytesConsumed", 1);
+ emitter.verifyEmitted("kafka/consumer/recordsConsumed", 1);
+
recordSupplier.close();
+ Assert.assertFalse(monitor.monitor(emitter));
}
@@ -393,7 +406,7 @@ public class KafkaRecordSupplierTest
public void testPollAfterMoreDataAdded() throws InterruptedException,
ExecutionException
{
// Insert data
- try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records.subList(0, 13)) {
@@ -403,25 +416,26 @@ public class KafkaRecordSupplierTest
}
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(poll_timeout_millis);
- for (int i = 0; polledRecords.size() != 13 && i < pollRetry; i++) {
- polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(
+ POLL_TIMEOUT_MILLIS);
+ for (int i = 0; polledRecords.size() != 13 && i < POLL_RETRY; i++) {
+ polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
// Insert data
- try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records.subList(13, 15)) {
@@ -431,8 +445,8 @@ public class KafkaRecordSupplierTest
}
- for (int i = 0; polledRecords.size() != records.size() && i < pollRetry;
i++) {
- polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ for (int i = 0; polledRecords.size() != records.size() && i < POLL_RETRY;
i++) {
+ polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@@ -471,16 +485,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
- StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
- StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+ StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+ StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -493,9 +507,10 @@ public class KafkaRecordSupplierTest
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
initialRecords = createOrderedPartitionableRecords();
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(poll_timeout_millis);
- for (int i = 0; polledRecords.size() != 11 && i < pollRetry; i++) {
- polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+ List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(
+ POLL_TIMEOUT_MILLIS);
+ for (int i = 0; polledRecords.size() != 11 && i < POLL_RETRY; i++) {
+ polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@@ -514,16 +529,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
- StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
- StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+ StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+ StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -532,7 +547,8 @@ public class KafkaRecordSupplierTest
Assert.assertEquals(0L, (long)
recordSupplier.getEarliestSequenceNumber(partition1));
recordSupplier.seekToLatest(partitions);
- List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>>
polledRecords = recordSupplier.poll(
+ POLL_TIMEOUT_MILLIS);
Assert.assertEquals(Collections.emptyList(), polledRecords);
recordSupplier.close();
@@ -542,21 +558,21 @@ public class KafkaRecordSupplierTest
public void testSeekUnassigned() throws InterruptedException,
ExecutionException
{
// Insert data
- try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
KAFKA_SERVER.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
- StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
- StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+ StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+ StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0)
+ StreamPartition.of(TOPIC, 0)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
@@ -573,16 +589,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
- StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
- StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+ StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+ StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
- StreamPartition.of(topic, 0),
- StreamPartition.of(topic, 1)
+ StreamPartition.of(TOPIC, 0),
+ StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -617,8 +633,8 @@ public class KafkaRecordSupplierTest
public void
getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
- StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+ StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -629,8 +645,8 @@ public class KafkaRecordSupplierTest
public void
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
- StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+ StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -641,8 +657,8 @@ public class KafkaRecordSupplierTest
public void
getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
- StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+ StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
@@ -653,8 +669,8 @@ public class KafkaRecordSupplierTest
public void
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
- StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+ StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
@@ -693,7 +709,7 @@ public class KafkaRecordSupplierTest
{
KafkaConsumer<byte[], byte[]> kafkaConsumer =
KafkaRecordSupplier.getKafkaConsumer(
OBJECT_MAPPER,
- kafkaServer.consumerProperties(),
+ KAFKA_SERVER.consumerProperties(),
originalConsumerProperties -> {
final Map<String, Object> newMap = new
HashMap<>(originalConsumerProperties);
newMap.put("client.id", "overrideConfigTest");
@@ -711,7 +727,7 @@ public class KafkaRecordSupplierTest
private void insertData() throws ExecutionException, InterruptedException
{
- try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records) {
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 6ab91c9585..1aa6d5b2e3 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -100,7 +100,7 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
}
@Override
- protected KinesisRecordSupplier newTaskRecordSupplier()
+ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox
toolbox)
throws RuntimeException
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig)
super.ioConfig);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 17beea1cce..b12ec56de9 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.ParseExceptionReport;
@@ -2485,7 +2486,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
}
@Override
- protected KinesisRecordSupplier newTaskRecordSupplier()
+ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox
toolbox)
{
return localSupplier == null ? recordSupplier : localSupplier;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index f708bf95d8..42438ebd36 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -108,6 +108,12 @@ public class TaskRealtimeMetricsMonitor extends
AbstractMonitor
log.warn("[%,d] duplicate events!", dedup);
}
emitter.emit(builder.build("ingest/events/duplicate", dedup));
+ emitter.emit(
+ builder.build(
+ "ingest/input/bytes",
+ rowIngestionMetersTotals.getProcessedBytes() -
previousRowIngestionMetersTotals.getProcessedBytes()
+ )
+ );
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() -
previousFireDepartmentMetrics.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists()
- previousFireDepartmentMetrics.numPersists()));
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 302099685b..3aca46fbfa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -268,7 +268,27 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected abstract SeekableStreamIndexTaskRunner<PartitionIdType,
SequenceOffsetType, RecordType> createTaskRunner();
- protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType,
RecordType> newTaskRecordSupplier();
+ /**
+ * Deprecated method for providing the {@link RecordSupplier} that connects
with the stream. New extensions should
+ * override {@link #newTaskRecordSupplier(TaskToolbox)} instead.
+ */
+ @Deprecated
+ protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
newTaskRecordSupplier()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Subclasses must override this method to provide the {@link
RecordSupplier} that connects with the stream.
+ *
+ * The default implementation delegates to {@link #newTaskRecordSupplier()},
which is deprecated, in order to support
+ * existing extensions that have implemented that older method instead of
this newer one. New extensions should
+ * override this method, not {@link #newTaskRecordSupplier()}.
+ */
+ protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
newTaskRecordSupplier(final TaskToolbox toolbox)
+ {
+ return newTaskRecordSupplier();
+ }
@VisibleForTesting
public Appenderator getAppenderator()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f50159a0c0..1bfbd62bb8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -416,8 +416,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;
- try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
recordSupplier = task.newTaskRecordSupplier()) {
-
+ try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
recordSupplier =
+ task.newTaskRecordSupplier(toolbox)) {
if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements())
{
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index db51a416fc..0f280059e0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -367,7 +367,7 @@ public class SeekableStreamIndexTaskRunnerAuthTest
}
@Override
- protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier()
+ protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier(final TaskToolbox toolbox)
{
return null;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index a347541a4e..24a2ed60b3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -1297,7 +1298,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
@Override
- protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier()
+ protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier(final TaskToolbox toolbox)
{
return recordSupplier;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 81c128744e..3085376b82 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -37,11 +37,20 @@ public interface RowIngestionMeters extends InputStats
String DETERMINE_PARTITIONS = "determinePartitions";
String PROCESSED = "processed";
- String PROCESSED_BYTES = "processedBytes";
String PROCESSED_WITH_ERROR = "processedWithError";
String UNPARSEABLE = "unparseable";
String THROWN_AWAY = "thrownAway";
+ /**
+ * Number of bytes read by an ingestion task.
+ *
+ * Note: processedBytes is a misleading name; this generally measures size
when data is initially read or fetched,
+ * not when it is processed by the ingest task. It's measuring a stage
somewhat earlier in the pipeline. In other
+ * words, "processed" and "processedBytes" do not use the same definition of
"process". A better name might be
+ * "bytesRead" or "inputBytes", although if we change it, we must consider
compatibility with existing readers.
+ */
+ String PROCESSED_BYTES = "processedBytes";
+
long getProcessed();
void incrementProcessed();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]