This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bac5ef347c Add ingest/input/bytes metric and Kafka consumer metrics. 
(#14582)
bac5ef347c is described below

commit bac5ef347ceaedcc202bc842aa58a9ee64d8f890
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jul 19 19:56:22 2023 -0700

    Add ingest/input/bytes metric and Kafka consumer metrics. (#14582)
    
    * Add ingest/input/bytes metric and Kafka consumer metrics.
    
    New metrics:
    
    1) ingest/input/bytes. Equivalent to processedBytes in the task reports.
    
    2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
       metric "bytes-consumed-total". Only emitted for Kafka tasks.
    
    3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
       metric "records-consumed-total". Only emitted for Kafka tasks.
    
    * Fix anchor.
    
    * Fix KafkaConsumerMonitor.
    
    * Interface updates.
    
    * Doc changes.
    
    * Update 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
    
    Co-authored-by: Benedict Jin <[email protected]>
    
    ---------
    
    Co-authored-by: Benedict Jin <[email protected]>
---
 docs/operations/metrics.md                         |   6 +-
 .../druid/indexing/kafka/KafkaConsumerMonitor.java |  94 ++++++++++
 .../druid/indexing/kafka/KafkaIndexTask.java       |  12 +-
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  13 ++
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   2 -
 .../indexing/kafka/KafkaRecordSupplierTest.java    | 192 +++++++++++----------
 .../druid/indexing/kinesis/KinesisIndexTask.java   |   2 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   3 +-
 .../common/stats/TaskRealtimeMetricsMonitor.java   |   6 +
 .../seekablestream/SeekableStreamIndexTask.java    |  22 ++-
 .../SeekableStreamIndexTaskRunner.java             |   4 +-
 .../SeekableStreamIndexTaskRunnerAuthTest.java     |   2 +-
 .../SeekableStreamSupervisorStateTest.java         |   3 +-
 .../segment/incremental/RowIngestionMeters.java    |  11 +-
 14 files changed, 270 insertions(+), 102 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1c3b6817d6..b1922e3519 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -210,10 +210,12 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/events/thrownAway`|Number of events rejected because they are either 
null, or filtered by the transform spec, or outside the 
windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/processed`|Number of events processed per emission 
period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the 
number of events per emission period.|
+|`ingest/events/processedWithError`|Number of events processed with some 
partial errors per emission period. Events processed with partial errors are 
counted towards both this metric and `ingest/events/processed`.|`dataSource`, 
`taskId`, `taskType`, `groupId`, `tags`|0|
 |`ingest/events/unparseable`|Number of events rejected because the events are 
unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
+|`ingest/events/thrownAway`|Number of events rejected because they are null, 
or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, 
`earlyMessageRejectionPeriod`, or `windowPeriod`.|`dataSource`, `taskId`, 
`taskType`, `groupId`, `tags`|0|
 |`ingest/events/duplicate`|Number of events rejected because the events are 
duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
-|`ingest/events/processed`|Number of events successfully processed per 
emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to 
the number of events per emission period.|
+|`ingest/input/bytes`|Number of bytes read from input sources, after 
decompression but prior to parsing. This covers all data read, including data 
that does not end up being fully processed and ingested. For example, this 
includes data that ends up being rejected for being unparseable or filtered 
out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
amount of data read.|
 |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, 
`taskType`, `groupId`|Your number of events with rollup.|
 |`ingest/persists/count`|Number of times persist occurred.|`dataSource`, 
`taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
 |`ingest/persists/time`|Milliseconds spent doing intermediate 
persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on 
configuration. Generally a few minutes at most.|
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
new file mode 100644
index 0000000000..dd10335045
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class KafkaConsumerMonitor extends AbstractMonitor
+{
+  private volatile boolean stopAfterNext = false;
+
+  // Kafka metric name -> Druid metric name
+  private static final Map<String, String> METRICS =
+      ImmutableMap.<String, String>builder()
+                  .put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
+                  .put("records-consumed-total", 
"kafka/consumer/recordsConsumed")
+                  .build();
+  private static final String TOPIC_TAG = "topic";
+  private static final Set<String> TOPIC_METRIC_TAGS = 
ImmutableSet.of("client-id", TOPIC_TAG);
+
+  private final KafkaConsumer<?, ?> consumer;
+  private final Map<String, AtomicLong> counters = new HashMap<>();
+
+  public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
+  {
+    this.consumer = consumer;
+  }
+
+  @Override
+  public boolean doMonitor(final ServiceEmitter emitter)
+  {
+    for (final Map.Entry<MetricName, ? extends Metric> entry : 
consumer.metrics().entrySet()) {
+      final MetricName metricName = entry.getKey();
+
+      if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) 
{
+        final String topic = metricName.tags().get(TOPIC_TAG);
+        final long newValue = ((Number) 
entry.getValue().metricValue()).longValue();
+        final long priorValue =
+            counters.computeIfAbsent(metricName.name(), ignored -> new 
AtomicLong())
+                    .getAndSet(newValue);
+
+        if (newValue != priorValue) {
+          final ServiceMetricEvent.Builder builder =
+              new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
+          emitter.emit(builder.build(METRICS.get(metricName.name()), newValue 
- priorValue));
+        }
+      }
+    }
+
+    return !stopAfterNext;
+  }
+
+  public void stopAfterNextEmit()
+  {
+    stopAfterNext = true;
+  }
+
+  private static boolean isTopicMetric(final MetricName metricName)
+  {
+    // Certain metrics are emitted both as grand totals and broken down by 
topic; we want to ignore the grand total and
+    // only look at the per-topic metrics. See 
https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
+    return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
+           && !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 72227a8474..14e1dbe276 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.kafka.KafkaRecordEntity;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -97,7 +98,7 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<Integer, Long, Kafka
   }
 
   @Override
-  protected KafkaRecordSupplier newTaskRecordSupplier()
+  protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox 
toolbox)
   {
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
@@ -107,7 +108,14 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<Integer, Long, Kafka
 
       props.put("auto.offset.reset", "none");
 
-      return new KafkaRecordSupplier(props, configMapper, 
kafkaIndexTaskIOConfig.getConfigOverrides());
+      final KafkaRecordSupplier recordSupplier =
+          new KafkaRecordSupplier(props, configMapper, 
kafkaIndexTaskIOConfig.getConfigOverrides());
+
+      if (toolbox.getMonitorScheduler() != null) {
+        toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
+      }
+
+      return recordSupplier;
     }
     finally {
       Thread.currentThread().setContextClassLoader(currCtxCl);
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index eb3833a0b6..ce5a901fd7 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -33,6 +33,7 @@ import 
org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.PasswordProvider;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -60,6 +61,7 @@ import java.util.stream.Collectors;
 public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, 
KafkaRecordEntity>
 {
   private final KafkaConsumer<byte[], byte[]> consumer;
+  private final KafkaConsumerMonitor monitor;
   private boolean closed;
 
   public KafkaRecordSupplier(
@@ -77,6 +79,7 @@ public class KafkaRecordSupplier implements 
RecordSupplier<Integer, Long, KafkaR
   )
   {
     this.consumer = consumer;
+    this.monitor = new KafkaConsumerMonitor(consumer);
   }
 
   @Override
@@ -190,6 +193,14 @@ public class KafkaRecordSupplier implements 
RecordSupplier<Integer, Long, KafkaR
     });
   }
 
+  /**
+   * Returns a Monitor that emits Kafka consumer metrics.
+   */
+  public Monitor monitor()
+  {
+    return monitor;
+  }
+
   @Override
   public void close()
   {
@@ -197,6 +208,8 @@ public class KafkaRecordSupplier implements 
RecordSupplier<Integer, Long, KafkaR
       return;
     }
     closed = true;
+
+    monitor.stopAfterNextEmit();
     consumer.close();
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 8ec664af89..f9fbeea298 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -76,7 +76,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -152,7 +151,6 @@ import java.util.stream.Stream;
 @RunWith(Parameterized.class)
 public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
 {
-  private static final Logger log = new Logger(KafkaIndexTaskTest.class);
   private static final long POLL_RETRY_MS = 100;
   private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new 
Header()
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index b480ec146f..7a8b9b1514 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.Monitor;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.MapStringDynamicConfigProvider;
 import org.apache.druid.segment.TestHelper;
@@ -59,15 +61,15 @@ import java.util.stream.Collectors;
 public class KafkaRecordSupplierTest
 {
 
-  private static String topic = "topic";
-  private static String additonal_parameter = "additional.parameter";
-  private static long poll_timeout_millis = 1000;
-  private static int pollRetry = 5;
-  private static int topicPosFix = 0;
+  private static final String ADDITIONAL_PARAMETER = "additional.parameter";
+  private static final long POLL_TIMEOUT_MILLIS = 1000;
+  private static final int POLL_RETRY = 5;
   private static final ObjectMapper OBJECT_MAPPER = 
TestHelper.makeJsonMapper();
 
-  private static TestingCluster zkServer;
-  private static TestBroker kafkaServer;
+  private static String TOPIC = "topic";
+  private static int TOPIC_POS_FIX = 0;
+  private static TestingCluster ZK_SERVER;
+  private static TestBroker KAFKA_SERVER;
 
   private List<ProducerRecord<byte[], byte[]>> records;
 
@@ -112,9 +114,9 @@ public class KafkaRecordSupplierTest
     }
   }
 
-  private static String getTopicName()
+  private static String nextTopicName()
   {
-    return "topic-" + topicPosFix++;
+    return "topic-" + TOPIC_POS_FIX++;
   }
 
   private List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
createOrderedPartitionableRecords()
@@ -129,7 +131,7 @@ public class KafkaRecordSupplierTest
         partitionToOffset.put(r.partition(), 1L);
       }
       return new OrderedPartitionableRecord<>(
-          topic,
+          TOPIC,
           r.partition(),
           offset,
           r.value() == null ? null : Collections.singletonList(new 
KafkaRecordEntity(
@@ -187,34 +189,34 @@ public class KafkaRecordSupplierTest
   @BeforeClass
   public static void setupClass() throws Exception
   {
-    zkServer = new TestingCluster(1);
-    zkServer.start();
+    ZK_SERVER = new TestingCluster(1);
+    ZK_SERVER.start();
 
-    kafkaServer = new TestBroker(
-        zkServer.getConnectString(),
+    KAFKA_SERVER = new TestBroker(
+        ZK_SERVER.getConnectString(),
         null,
         1,
         ImmutableMap.of("num.partitions", "2")
     );
-    kafkaServer.start();
+    KAFKA_SERVER.start();
 
   }
 
   @Before
   public void setupTest()
   {
-    topic = getTopicName();
-    records = generateRecords(topic);
+    TOPIC = nextTopicName();
+    records = generateRecords(TOPIC);
   }
 
   @AfterClass
   public static void tearDownClass() throws Exception
   {
-    kafkaServer.close();
-    kafkaServer = null;
+    KAFKA_SERVER.close();
+    KAFKA_SERVER = null;
 
-    zkServer.stop();
-    zkServer = null;
+    ZK_SERVER.stop();
+    ZK_SERVER = null;
   }
 
   @Test
@@ -225,19 +227,19 @@ public class KafkaRecordSupplierTest
     insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
 
     recordSupplier.assign(partitions);
 
     Assert.assertEquals(partitions, recordSupplier.getAssignment());
-    Assert.assertEquals(ImmutableSet.of(0, 1), 
recordSupplier.getPartitionIds(topic));
+    Assert.assertEquals(ImmutableSet.of(0, 1), 
recordSupplier.getPartitionIds(TOPIC));
 
     recordSupplier.close();
   }
@@ -250,11 +252,11 @@ public class KafkaRecordSupplierTest
     insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
-    Map<String, Object> properties = kafkaServer.consumerProperties();
+    Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
     properties.put("key.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
     properties.put("value.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
 
@@ -269,7 +271,7 @@ public class KafkaRecordSupplierTest
     recordSupplier.assign(partitions);
 
     Assert.assertEquals(partitions, recordSupplier.getAssignment());
-    Assert.assertEquals(ImmutableSet.of(0, 1), 
recordSupplier.getPartitionIds(topic));
+    Assert.assertEquals(ImmutableSet.of(0, 1), 
recordSupplier.getPartitionIds(TOPIC));
 
     recordSupplier.close();
   }
@@ -279,10 +281,10 @@ public class KafkaRecordSupplierTest
   public void testSupplierSetupCustomDeserializerRequiresParameter()
   {
 
-    Map<String, Object> properties = kafkaServer.consumerProperties();
+    Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
     properties.put("key.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
     properties.put("value.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
-    properties.put(additonal_parameter, "stringValue");
+    properties.put(ADDITIONAL_PARAMETER, "stringValue");
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
             properties,
@@ -298,7 +300,7 @@ public class KafkaRecordSupplierTest
   public void 
testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
   {
 
-    Map<String, Object> properties = kafkaServer.consumerProperties();
+    Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
     properties.put("key.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
     properties.put("value.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
 
@@ -320,11 +322,11 @@ public class KafkaRecordSupplierTest
     insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
-    Map<String, Object> properties = kafkaServer.consumerProperties();
+    Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
     properties.put("key.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
     properties.put("value.deserializer", 
KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
 
@@ -339,9 +341,10 @@ public class KafkaRecordSupplierTest
 
     List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
 
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(poll_timeout_millis);
-    for (int i = 0; polledRecords.size() != initialRecords.size() && i < 
pollRetry; i++) {
-      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(
+        POLL_TIMEOUT_MILLIS);
+    for (int i = 0; polledRecords.size() != initialRecords.size() && i < 
POLL_RETRY; i++) {
+      polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
       Thread.sleep(200);
     }
 
@@ -360,24 +363,27 @@ public class KafkaRecordSupplierTest
     insertData();
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(),
+        KAFKA_SERVER.consumerProperties(),
         OBJECT_MAPPER,
         null
     );
 
+    final Monitor monitor = recordSupplier.monitor();
+    monitor.start();
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
 
     List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
 
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(poll_timeout_millis);
-    for (int i = 0; polledRecords.size() != initialRecords.size() && i < 
pollRetry; i++) {
-      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords =
+        recordSupplier.poll(POLL_TIMEOUT_MILLIS);
+    for (int i = 0; polledRecords.size() != initialRecords.size() && i < 
POLL_RETRY; i++) {
+      polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
       Thread.sleep(200);
     }
 
@@ -385,7 +391,14 @@ public class KafkaRecordSupplierTest
     Assert.assertEquals(initialRecords.size(), polledRecords.size());
     Assert.assertTrue(initialRecords.containsAll(polledRecords));
 
+    // Verify metrics
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.monitor(emitter));
+    emitter.verifyEmitted("kafka/consumer/bytesConsumed", 1);
+    emitter.verifyEmitted("kafka/consumer/recordsConsumed", 1);
+
     recordSupplier.close();
+    Assert.assertFalse(monitor.monitor(emitter));
   }
 
 
@@ -393,7 +406,7 @@ public class KafkaRecordSupplierTest
   public void testPollAfterMoreDataAdded() throws InterruptedException, 
ExecutionException
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
KAFKA_SERVER.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : records.subList(0, 13)) {
@@ -403,25 +416,26 @@ public class KafkaRecordSupplierTest
     }
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
 
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(poll_timeout_millis);
-    for (int i = 0; polledRecords.size() != 13 && i < pollRetry; i++) {
-      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(
+        POLL_TIMEOUT_MILLIS);
+    for (int i = 0; polledRecords.size() != 13 && i < POLL_RETRY; i++) {
+      polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
       Thread.sleep(200);
     }
 
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
KAFKA_SERVER.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : records.subList(13, 15)) {
@@ -431,8 +445,8 @@ public class KafkaRecordSupplierTest
     }
 
 
-    for (int i = 0; polledRecords.size() != records.size() && i < pollRetry; 
i++) {
-      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+    for (int i = 0; polledRecords.size() != records.size() && i < POLL_RETRY; 
i++) {
+      polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
       Thread.sleep(200);
     }
 
@@ -471,16 +485,16 @@ public class KafkaRecordSupplierTest
     // Insert data
     insertData();
 
-    StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
-    StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+    StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+    StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -493,9 +507,10 @@ public class KafkaRecordSupplierTest
 
     List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
initialRecords = createOrderedPartitionableRecords();
 
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(poll_timeout_millis);
-    for (int i = 0; polledRecords.size() != 11 && i < pollRetry; i++) {
-      polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(
+        POLL_TIMEOUT_MILLIS);
+    for (int i = 0; polledRecords.size() != 11 && i < POLL_RETRY; i++) {
+      polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
       Thread.sleep(200);
     }
 
@@ -514,16 +529,16 @@ public class KafkaRecordSupplierTest
     // Insert data
     insertData();
 
-    StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
-    StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+    StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+    StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -532,7 +547,8 @@ public class KafkaRecordSupplierTest
     Assert.assertEquals(0L, (long) 
recordSupplier.getEarliestSequenceNumber(partition1));
 
     recordSupplier.seekToLatest(partitions);
-    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(poll_timeout_millis);
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> 
polledRecords = recordSupplier.poll(
+        POLL_TIMEOUT_MILLIS);
 
     Assert.assertEquals(Collections.emptyList(), polledRecords);
     recordSupplier.close();
@@ -542,21 +558,21 @@ public class KafkaRecordSupplierTest
   public void testSeekUnassigned() throws InterruptedException, 
ExecutionException
   {
     // Insert data
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
KAFKA_SERVER.newProducer()) {
       for (ProducerRecord<byte[], byte[]> record : records) {
         kafkaProducer.send(record).get();
       }
     }
 
-    StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
-    StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+    StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+    StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0)
+        StreamPartition.of(TOPIC, 0)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
 
@@ -573,16 +589,16 @@ public class KafkaRecordSupplierTest
     // Insert data
     insertData();
 
-    StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
-    StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
+    StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
+    StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
 
     Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
-        StreamPartition.of(topic, 0),
-        StreamPartition.of(topic, 1)
+        StreamPartition.of(TOPIC, 0),
+        StreamPartition.of(TOPIC, 1)
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -617,8 +633,8 @@ public class KafkaRecordSupplierTest
   public void 
getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
-    StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+    StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
     Set<StreamPartition<Integer>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -629,8 +645,8 @@ public class KafkaRecordSupplierTest
   public void 
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
-    StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+    StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
     Set<StreamPartition<Integer>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -641,8 +657,8 @@ public class KafkaRecordSupplierTest
   public void 
getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
-    StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+    StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
     Set<StreamPartition<Integer>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
     recordSupplier.seekToLatest(partitions);
@@ -653,8 +669,8 @@ public class KafkaRecordSupplierTest
   public void 
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
-    StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
+    StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
     Set<StreamPartition<Integer>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
     recordSupplier.seekToLatest(partitions);
@@ -693,7 +709,7 @@ public class KafkaRecordSupplierTest
   {
     KafkaConsumer<byte[], byte[]> kafkaConsumer = 
KafkaRecordSupplier.getKafkaConsumer(
         OBJECT_MAPPER,
-        kafkaServer.consumerProperties(),
+        KAFKA_SERVER.consumerProperties(),
         originalConsumerProperties -> {
           final Map<String, Object> newMap = new 
HashMap<>(originalConsumerProperties);
           newMap.put("client.id", "overrideConfigTest");
@@ -711,7 +727,7 @@ public class KafkaRecordSupplierTest
 
   private void insertData() throws ExecutionException, InterruptedException
   {
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
KAFKA_SERVER.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
       for (ProducerRecord<byte[], byte[]> record : records) {
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 6ab91c9585..1aa6d5b2e3 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -100,7 +100,7 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, By
   }
 
   @Override
-  protected KinesisRecordSupplier newTaskRecordSupplier()
+  protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox 
toolbox)
       throws RuntimeException
   {
     KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) 
super.ioConfig);
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 17beea1cce..b12ec56de9 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.ParseExceptionReport;
@@ -2485,7 +2486,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     }
 
     @Override
-    protected KinesisRecordSupplier newTaskRecordSupplier()
+    protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox 
toolbox)
     {
       return localSupplier == null ? recordSupplier : localSupplier;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index f708bf95d8..42438ebd36 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -108,6 +108,12 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
       log.warn("[%,d] duplicate events!", dedup);
     }
     emitter.emit(builder.build("ingest/events/duplicate", dedup));
+    emitter.emit(
+        builder.build(
+            "ingest/input/bytes",
+            rowIngestionMetersTotals.getProcessedBytes() - 
previousRowIngestionMetersTotals.getProcessedBytes()
+        )
+    );
 
     emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - 
previousFireDepartmentMetrics.rowOutput()));
     emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() 
- previousFireDepartmentMetrics.numPersists()));
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 302099685b..3aca46fbfa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -268,7 +268,27 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
 
   protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType, RecordType> createTaskRunner();
 
-  protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType, 
RecordType> newTaskRecordSupplier();
+  /**
+   * Deprecated method for providing the {@link RecordSupplier} that connects 
with the stream. New extensions should
+   * override {@link #newTaskRecordSupplier(TaskToolbox)} instead.
+   */
+  @Deprecated
+  protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> 
newTaskRecordSupplier()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Subclasses must override this method to provide the {@link 
RecordSupplier} that connects with the stream.
+   *
+   * The default implementation delegates to {@link #newTaskRecordSupplier()}, 
which is deprecated, in order to support
+   * existing extensions that have implemented that older method instead of 
this newer one. New extensions should
+   * override this method, not {@link #newTaskRecordSupplier()}.
+   */
+  protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> 
newTaskRecordSupplier(final TaskToolbox toolbox)
+  {
+    return newTaskRecordSupplier();
+  }
 
   @VisibleForTesting
   public Appenderator getAppenderator()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f50159a0c0..1bfbd62bb8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -416,8 +416,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     //milliseconds waited for created segments to be handed off
     long handoffWaitMs = 0L;
 
-    try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> 
recordSupplier = task.newTaskRecordSupplier()) {
-
+    try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> 
recordSupplier =
+             task.newTaskRecordSupplier(toolbox)) {
       if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) 
{
         toolbox.getDataSegmentServerAnnouncer().announce();
         toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index db51a416fc..0f280059e0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -367,7 +367,7 @@ public class SeekableStreamIndexTaskRunnerAuthTest
     }
 
     @Override
-    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier()
+    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier(final TaskToolbox toolbox)
     {
       return null;
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index a347541a4e..24a2ed60b3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -1297,7 +1298,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }
 
     @Override
-    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier()
+    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier(final TaskToolbox toolbox)
     {
       return recordSupplier;
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
index 81c128744e..3085376b82 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java
@@ -37,11 +37,20 @@ public interface RowIngestionMeters extends InputStats
   String DETERMINE_PARTITIONS = "determinePartitions";
 
   String PROCESSED = "processed";
-  String PROCESSED_BYTES = "processedBytes";
   String PROCESSED_WITH_ERROR = "processedWithError";
   String UNPARSEABLE = "unparseable";
   String THROWN_AWAY = "thrownAway";
 
+  /**
+   * Number of bytes read by an ingestion task.
+   *
+   * Note: processedBytes is a misleading name; this generally measures size 
when data is initially read or fetched,
+   * not when it is processed by the ingest task. It's measuring a stage 
somewhat earlier in the pipeline. In other
+   * words, "processed" and "processedBytes" do not use the same definition of 
"process". A better name might be
+   * "bytesRead" or "inputBytes", although if we change it, we must consider 
compatibility with existing readers.
+   */
+  String PROCESSED_BYTES = "processedBytes";
+
   long getProcessed();
   void incrementProcessed();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to