This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 47bcc3f9df1 Add new metric `ingest/rows/published` to fix flaky
`KinesisFaultToleranceTest` (#19177)
47bcc3f9df1 is described below
commit 47bcc3f9df129aaeeab7975724a5835f0fbb5647
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Mar 20 21:05:25 2026 +0530
Add new metric `ingest/rows/published` to fix flaky
`KinesisFaultToleranceTest` (#19177)
---
docs/operations/metrics.md | 1 +
.../embedded/indexing/StreamIndexTestBase.java | 7 +-
.../src/main/resources/defaultMetrics.json | 1 +
.../main/resources/defaultMetricDimensions.json | 1 +
.../druid/indexing/common/task/IndexTask.java | 5 +
.../druid/indexing/common/task/IndexTaskUtils.java | 17 ++
.../parallel/ParallelIndexSupervisorTask.java | 2 +-
.../SeekableStreamIndexTaskRunner.java | 4 +
.../SeekableStreamIndexTaskRunnerTest.java | 144 +++++++++++-
.../org/apache/druid/msq/exec/ControllerImpl.java | 4 +
processing/src/main/resources/defaultMetrics.json | 1 +
processing/src/test/resources/defaultMetrics.json | 251 ++++++++++++++++++++-
12 files changed, 425 insertions(+), 13 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 582a8d60877..e5715e821ae 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -222,6 +222,7 @@ If SQL is enabled, the Broker will emit the following
metrics for SQL.
|------|-----------|----------|------------|
|`ingest/count`|Count of `1` every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`,
`taskType`, `groupId`, `taskIngestionMode`, `tags` |Always `1`.|
|`ingest/segments/count`|Count of final segments created by job (includes
tombstones). | `dataSource`, `taskId`, `taskType`, `groupId`,
`taskIngestionMode`, `tags` |At least `1`.|
+|`ingest/rows/published`|Number of rows successfully published by the job. |
`dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |At
least `1`.|
|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`,
`taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Zero or more for
replace. Always zero for non-replace tasks (always zero for legacy replace, see
below).|
The `taskIngestionMode` dimension includes the following modes:
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index c9ec82e60e5..772d1bcc9b0 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -129,19 +129,20 @@ public abstract class StreamIndexTestBase extends
EmbeddedClusterTestBase
}
/**
- * Waits until number of processed events matches {@code expectedRowCount}.
+ * Waits until the total row count of successfully published segments matches
+ * {@code expectedRowCount}.
*/
protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
{
indexer.latchableEmitter().waitForEventAggregate(
- event -> event.hasMetricName("ingest/events/processed")
+ event -> event.hasMetricName("ingest/rows/published")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(expectedRowCount)
);
final int totalEventsProcessed = indexer
.latchableEmitter()
- .getMetricValues("ingest/events/processed",
Map.of(DruidMetrics.DATASOURCE, dataSource))
+ .getMetricValues("ingest/rows/published",
Map.of(DruidMetrics.DATASOURCE, dataSource))
.stream()
.mapToInt(Number::intValue)
.sum();
diff --git
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 79be4527447..247686a838b 100644
---
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -101,6 +101,7 @@
"ingest/count" : { "dimensions" : ["dataSource", "taskType"], "type" :
"count", "help": "Count of 1 every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions." },
"ingest/segments/count" : { "dimensions" : ["dataSource", "taskType"],
"type" : "count", "help": "Count of final segments created by job (includes
tombstones)." },
"ingest/tombstones/count" : { "dimensions" : ["dataSource", "taskType"],
"type" : "count", "help": "Count of tombstones created by job." },
+ "ingest/rows/published": { "dimensions" : ["dataSource", "taskType"], "type"
: "count", "help": "Number of rows successfully published by the job." },
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge", "help": "Total lag between the offsets consumed by the Kafka indexing
tasks and latest offsets in Kafka brokers across all partitions. Minimum
emission period for this metric is a minute."},
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge", "help": "Max lag between the offsets consumed by the Kafka indexing
tasks and latest offsets in Kafka brokers across all partitions. Minimum
emission period for this metric is a minute."},
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index d8074cb2909..828031cfb11 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -54,6 +54,7 @@
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count"
},
+ "ingest/rows/published": { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index abbbf26400f..eb3b5c0e84f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -986,6 +986,11 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler, Pe
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
published.getSegments().size() + tombStones.size()
);
+ emitMetric(
+ toolbox.getEmitter(),
+ "ingest/rows/published",
+ IndexTaskUtils.getTotalRowCount(published.getSegments())
+ );
log.debugSegments(published.getSegments(), "Published segments");
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index e08c096d393..29692e1aa98 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -28,7 +28,9 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.timeline.DataSegment;
+import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
public class IndexTaskUtils
{
@@ -102,4 +104,19 @@ public class IndexTaskUtils
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure",
1));
}
}
+
+ /**
+ * Gets total row count of the given segments. Legacy segments do not have
the
+ * row count populated in the metadata and thus do not contribute to the row
+ * count.
+ */
+ public static long getTotalRowCount(Collection<DataSegment> segments)
+ {
+ return segments
+ .stream()
+ .map(DataSegment::getTotalRows)
+ .filter(Objects::nonNull)
+ .mapToLong(Integer::longValue)
+ .sum();
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index c4f37192ee1..8d5e9a85e95 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1220,7 +1220,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
// segment metrics:
emitMetric(toolbox.getEmitter(), "ingest/tombstones/count",
tombStones.size());
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
newSegments.size());
-
+ emitMetric(toolbox.getEmitter(), "ingest/rows/published",
IndexTaskUtils.getTotalRowCount(newSegments));
} else {
throw new ISE("Failed to publish segments");
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 0ab9f613cbe..0f986dfc9c4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -65,6 +65,7 @@ import
org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.input.InputRowSchemas;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -1094,15 +1095,18 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
);
// emit segment count metric:
int segmentCount = 0;
+ long totalRowCount = 0;
if (publishedSegmentsAndCommitMetadata != null
&& publishedSegmentsAndCommitMetadata.getSegments() != null) {
segmentCount =
publishedSegmentsAndCommitMetadata.getSegments().size();
+ totalRowCount =
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
}
task.emitMetric(
toolbox.getEmitter(),
"ingest/segments/count",
segmentCount
);
+ task.emitMetric(toolbox.getEmitter(), "ingest/rows/published",
totalRowCount);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 29dcc9ea37b..e1968a457c4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -22,45 +22,86 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.client.DruidServer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.discovery.DataNodeService;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
+import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
+import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import static org.mockito.ArgumentMatchers.any;
+
@RunWith(MockitoJUnitRunner.class)
public class SeekableStreamIndexTaskRunnerTest
{
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Mock
private InputRow row;
@Mock
private SeekableStreamIndexTask task;
+ private StubServiceEmitter emitter;
+
+ @Before
+ public void setup()
+ {
+ emitter = new StubServiceEmitter();
+ }
+
@Test
public void testWithinMinMaxTime()
{
@@ -101,7 +142,7 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(
+ TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
task,
LockGranularity.TIME_CHUNK
);
@@ -156,7 +197,7 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getDataSchema()).thenReturn(schema);
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(
+ TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
task,
LockGranularity.TIME_CHUNK
);
@@ -209,7 +250,7 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(
+ TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
task,
LockGranularity.TIME_CHUNK
);
@@ -218,7 +259,7 @@ public class SeekableStreamIndexTaskRunnerTest
}
@Test
- public void testGetSupervisorId()
+ public void test_run_emitsRowCountAndSegmentCount_onSuccessfulPublish()
{
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
@@ -252,18 +293,105 @@ public class SeekableStreamIndexTaskRunnerTest
Mockito.when(task.getDataSchema()).thenReturn(schema);
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-
+ Mockito.when(task.getId()).thenReturn("task1");
Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
- TestasbleSeekableStreamIndexTaskRunner runner = new
TestasbleSeekableStreamIndexTaskRunner(
+ TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
task,
LockGranularity.TIME_CHUNK
);
Assert.assertEquals("supervisorId", runner.getSupervisorId());
+
+ // Setup the task to return a RecordSupplier, StreamAppenderatorDriver,
Appenderator
+ final RecordSupplier<?, ?, ?> recordSupplier =
Mockito.mock(RecordSupplier.class);
+ Mockito.when(task.newTaskRecordSupplier(any()))
+ .thenReturn(recordSupplier);
+
+ final StreamAppenderator appenderator =
Mockito.mock(StreamAppenderator.class);
+ Mockito.when(task.newAppenderator(any(), any(), any(), any()))
+ .thenReturn(appenderator);
+
+ final List<DataSegment> segment = CreateDataSegments
+ .ofDatasource(schema.getDataSource())
+ .withNumPartitions(10)
+ .withNumRows(1_000)
+ .eachOfSizeInMb(500);
+ final SegmentsAndCommitMetadata commitMetadata = new
SegmentsAndCommitMetadata(segment, "offset-100");
+
+ final StreamAppenderatorDriver driver =
Mockito.mock(StreamAppenderatorDriver.class);
+ Mockito.when(task.newDriver(any(), any(), any()))
+ .thenReturn(driver);
+ Mockito.when(driver.publish(any(), any(), any()))
+ .thenReturn(Futures.immediateFuture(commitMetadata));
+ Mockito.when(driver.registerHandoff(any()))
+ .thenReturn(Futures.immediateFuture(commitMetadata));
+
+ Mockito.doAnswer(invocation -> {
+ final String metricName = invocation.getArgument(1);
+ final Number value = invocation.getArgument(2);
+ emitter.emit(ServiceMetricEvent.builder().setMetric(metricName,
value).build("test", "localhost"));
+ return null;
+ }).when(task).emitMetric(any(), any(), any());
+
+ runner.run(createTaskToolbox());
+ emitter.verifyValue("ingest/segments/count", 10);
+ emitter.verifyValue("ingest/rows/published", 10_000L);
+ }
+
+ private TaskToolbox createTaskToolbox()
+ {
+ final TestUtils testUtils = new TestUtils();
+ final File taskWorkDir = createTaskWorkDirectory();
+ return new TaskToolbox
+ .Builder()
+ .indexIO(TestHelper.getTestIndexIO())
+ .taskWorkDir(taskWorkDir)
+ .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+ .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
+ .rowIngestionMetersFactory(NoopRowIngestionMeters::new)
+ .indexMerger(testUtils.getIndexMergerV9Factory().create(true))
+ .chatHandlerProvider(new NoopChatHandlerProvider())
+ .dataNodeService(new DataNodeService(DruidServer.DEFAULT_TIER, 100L,
null, ServerType.HISTORICAL, 1))
+ .lookupNodeService(new LookupNodeService(DruidServer.DEFAULT_TIER))
+ .appenderatorsManager(new TestAppenderatorsManager())
+ .serverAnnouncer(new DataSegmentServerAnnouncer.Noop())
+ .druidNodeAnnouncer(new NoopDruidNodeAnnouncer())
+ .jsonMapper(TestHelper.JSON_MAPPER)
+ .emitter(emitter)
+ .build();
+ }
+
+ private File createTaskWorkDirectory()
+ {
+ try {
+ final File taskWorkDir = temporaryFolder.newFolder();
+ FileUtils.mkdirp(taskWorkDir);
+ FileUtils.mkdirp(new File(taskWorkDir, "persist"));
+ return taskWorkDir;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class NoopDruidNodeAnnouncer implements DruidNodeAnnouncer
+ {
+
+ @Override
+ public void announce(DiscoveryDruidNode discoveryDruidNode)
+ {
+
+ }
+
+ @Override
+ public void unannounce(DiscoveryDruidNode discoveryDruidNode)
+ {
+
+ }
}
- static class TestasbleSeekableStreamIndexTaskRunner extends
SeekableStreamIndexTaskRunner
+ static class TestSeekableStreamIndexTaskRunner extends
SeekableStreamIndexTaskRunner
{
- public TestasbleSeekableStreamIndexTaskRunner(
+ public TestSeekableStreamIndexTaskRunner(
SeekableStreamIndexTask task,
LockGranularity lockGranularityToUse
)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 6a064c82c5d..86223a20ac2 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -76,6 +76,7 @@ import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
@@ -1595,6 +1596,9 @@ public class ControllerImpl implements Controller
// Include tombstones in the reported segments count
metricBuilder.setMetric("ingest/segments/count",
segmentsWithTombstones.size());
context.emitMetric(metricBuilder);
+
+ metricBuilder.setMetric("ingest/rows/published",
IndexTaskUtils.getTotalRowCount(segmentsWithTombstones));
+ context.emitMetric(metricBuilder);
}
private static TaskAction<SegmentPublishResult> createAppendAction(
diff --git a/processing/src/main/resources/defaultMetrics.json
b/processing/src/main/resources/defaultMetrics.json
index cad4dcf1bfd..c4647a459c1 100644
--- a/processing/src/main/resources/defaultMetrics.json
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -37,6 +37,7 @@
"ingest/persists/time": [],
"ingest/rows/output": [],
"ingest/segments/count": [],
+ "ingest/rows/published": [],
"ingest/sink/count": [],
"ingest/tombstones/count": [],
"interval/compacted/count": [],
diff --git a/processing/src/test/resources/defaultMetrics.json
b/processing/src/test/resources/defaultMetrics.json
index 22a9d0329fc..9ec51afff2d 100644
--- a/processing/src/test/resources/defaultMetrics.json
+++ b/processing/src/test/resources/defaultMetrics.json
@@ -1 +1,250 @@
-{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"
[...]
+{
+ "compact/segmentAnalyzer/fetchAndProcessMillis": [],
+ "compact/task/count": [],
+ "compactTask/availableSlot/count": [],
+ "compactTask/maxSlot/count": [],
+ "coordinator/global/time": [],
+ "coordinator/time": [],
+ "groupBy/maxMergeDictionarySize": [],
+ "groupBy/maxSpilledBytes": [],
+ "groupBy/mergeDictionarySize": [],
+ "groupBy/spilledBytes": [],
+ "groupBy/spilledQueries": [],
+ "ingest/count": [],
+ "ingest/events/duplicate": [],
+ "ingest/events/messageGap": [],
+ "ingest/events/processed": [],
+ "ingest/events/processedWithError": [],
+ "ingest/events/thrownAway": [],
+ "ingest/events/unparseable": [],
+ "ingest/handoff/count": [],
+ "ingest/handoff/failed": [],
+ "ingest/handoff/time": [],
+ "ingest/input/bytes": [],
+ "ingest/kafka/avgLag": [],
+ "ingest/kafka/lag": [],
+ "ingest/kafka/maxLag": [],
+ "ingest/kafka/partitionLag": [],
+ "ingest/merge/cpu": [],
+ "ingest/merge/time": [],
+ "ingest/notices/queueSize": [],
+ "ingest/notices/time": [],
+ "ingest/pause/time": [],
+ "ingest/persists/backPressure": [],
+ "ingest/persists/count": [],
+ "ingest/persists/cpu": [],
+ "ingest/persists/failed": [],
+ "ingest/persists/time": [],
+ "ingest/rows/output": [],
+ "ingest/segments/count": [],
+ "ingest/rows/published": [],
+ "ingest/sink/count": [],
+ "ingest/tombstones/count": [],
+ "interval/compacted/count": [],
+ "interval/skipCompact/count": [],
+ "interval/waitCompact/count": [],
+ "jetty/numOpenConnections": [],
+ "jetty/threadPool/busy": [],
+ "jetty/threadPool/idle": [],
+ "jetty/threadPool/isLowOnThreads": [],
+ "jetty/threadPool/max": [],
+ "jetty/threadPool/min": [],
+ "jetty/threadPool/queueSize": [],
+ "jetty/threadPool/ready": [],
+ "jetty/threadPool/total": [],
+ "jetty/threadPool/utilizationRate": [],
+ "jetty/threadPool/utilized": [],
+ "jvm/bufferpool/capacity": [],
+ "jvm/bufferpool/count": [],
+ "jvm/bufferpool/used": [],
+ "jvm/gc/count": [],
+ "jvm/gc/cpu": [],
+ "jvm/mem/committed": [],
+ "jvm/mem/init": [],
+ "jvm/mem/max": [],
+ "jvm/mem/used": [],
+ "jvm/pool/committed": [],
+ "jvm/pool/init": [],
+ "jvm/pool/max": [],
+ "jvm/pool/used": [],
+ "kafka/consumer/bytesConsumed": [],
+ "kafka/consumer/fetch": [],
+ "kafka/consumer/fetchLatencyAvg": [],
+ "kafka/consumer/fetchLatencyMax": [],
+ "kafka/consumer/fetchRate": [],
+ "kafka/consumer/fetchSizeAvg": [],
+ "kafka/consumer/fetchSizeMax": [],
+ "kafka/consumer/incomingBytes": [],
+ "kafka/consumer/outgoingBytes": [],
+ "kafka/consumer/recordsConsumed": [],
+ "kafka/consumer/recordsLag": [],
+ "kafka/consumer/recordsPerRequestAvg": [],
+ "kill/eligibleUnusedSegments/count": [],
+ "kill/pendingSegments/count": [],
+ "kill/task/count": [],
+ "killTask/availableSlot/count": [],
+ "killTask/maxSlot/count": [],
+ "mergeBuffer/acquisitionTimeNs": [],
+ "mergeBuffer/maxAcquisitionTimeNs": [],
+ "mergeBuffer/pendingRequests": [],
+ "mergeBuffer/queries": [],
+ "mergeBuffer/used": [],
+ "metadata/kill/audit/count": [],
+ "metadata/kill/compaction/count": [],
+ "metadata/kill/datasource/count": [],
+ "metadata/kill/rule/count": [],
+ "metadata/kill/supervisor/count": [],
+ "metadatacache/backfill/count": [],
+ "metadatacache/init/time": [],
+ "metadatacache/refresh/count": [],
+ "metadatacache/refresh/time": [],
+ "metadatacache/schemaPoll/count": [],
+ "metadatacache/schemaPoll/failed": [],
+ "metadatacache/schemaPoll/time": [],
+ "query/bytes": [],
+ "query/cache/delta/averageBytes": [],
+ "query/cache/delta/errors": [],
+ "query/cache/delta/evictions": [],
+ "query/cache/delta/hitRate": [],
+ "query/cache/delta/hits": [],
+ "query/cache/delta/misses": [],
+ "query/cache/delta/numEntries": [],
+ "query/cache/delta/put/error": [],
+ "query/cache/delta/put/ok": [],
+ "query/cache/delta/put/oversized": [],
+ "query/cache/delta/sizeBytes": [],
+ "query/cache/delta/timeouts": [],
+ "query/cache/total/averageBytes": [],
+ "query/cache/total/errors": [],
+ "query/cache/total/evictions": [],
+ "query/cache/total/hitRate": [],
+ "query/cache/total/hits": [],
+ "query/cache/total/misses": [],
+ "query/cache/total/numEntries": [],
+ "query/cache/total/put/error": [],
+ "query/cache/total/put/ok": [],
+ "query/cache/total/put/oversized": [],
+ "query/cache/total/sizeBytes": [],
+ "query/cache/total/timeouts": [],
+ "query/count": [],
+ "query/cpu/time": [],
+ "query/failed/count": [],
+ "query/interrupted/count": [],
+ "query/node/bytes": [],
+ "query/node/time": [],
+ "query/node/ttfb": [],
+ "query/segment/time": [],
+ "query/segmentAndCache/time": [],
+ "query/success/count": [],
+ "query/time": [],
+ "query/timeout/count": [],
+ "query/wait/time": [],
+ "schemacache/finalizedSchemaPayload/count": [],
+ "schemacache/finalizedSegmentMetadata/count": [],
+ "schemacache/inTransitSMQPublishedResults/count": [],
+ "schemacache/inTransitSMQResults/count": [],
+ "schemacache/realtime/count": [],
+ "segment/added/bytes": [],
+ "segment/assignSkipped/count": [],
+ "segment/assigned/count": [],
+ "segment/availableDeepStorageOnly/count": [],
+ "segment/compacted/bytes": [],
+ "segment/compacted/count": [],
+ "segment/count": [],
+ "segment/deleted/count": [],
+ "segment/dropQueue/count": [],
+ "segment/dropSkipped/count": [],
+ "segment/dropped/count": [],
+ "segment/loadQueue/assigned": [],
+ "segment/loadQueue/cancelled": [],
+ "segment/loadQueue/count": [],
+ "segment/loadQueue/failed": [],
+ "segment/loadQueue/size": [],
+ "segment/loadQueue/success": [],
+ "segment/max": [],
+ "segment/moveSkipped/count": [],
+ "segment/moved/bytes": [],
+ "segment/moved/count": [],
+ "segment/nuked/bytes": [],
+ "segment/overShadowed/count": [],
+ "segment/pendingDelete": [],
+ "segment/scan/active": [],
+ "segment/scan/pending": [],
+ "segment/size": [],
+ "segment/skipCompact/bytes": [],
+ "segment/skipCompact/count": [],
+ "segment/unavailable/count": [],
+ "segment/underReplicated/count": [],
+ "segment/unneeded/count": [],
+ "segment/unneededEternityTombstone/count": [],
+ "segment/used": [],
+ "segment/usedPercent": [],
+ "segment/waitCompact/bytes": [],
+ "segment/waitCompact/count": [],
+ "serverview/init/time": [],
+ "serverview/sync/healthy": [],
+ "serverview/sync/unstableTime": [],
+ "service/heartbeat": [],
+ "sqlQuery/bytes": [],
+ "sqlQuery/planningTimeMs": [],
+ "sqlQuery/time": [],
+ "sys/cpu": [],
+ "sys/disk/queue": [],
+ "sys/disk/read/count": [],
+ "sys/disk/read/size": [],
+ "sys/disk/transferTime": [],
+ "sys/disk/write/count": [],
+ "sys/disk/write/size": [],
+ "sys/fs/files/count": [],
+ "sys/fs/files/free": [],
+ "sys/fs/max": [],
+ "sys/fs/used": [],
+ "sys/mem/free": [],
+ "sys/mem/max": [],
+ "sys/mem/used": [],
+ "sys/net/read/dropped": [],
+ "sys/net/read/errors": [],
+ "sys/net/read/packets": [],
+ "sys/net/read/size": [],
+ "sys/net/write/collisions": [],
+ "sys/net/write/errors": [],
+ "sys/net/write/packets": [],
+ "sys/net/write/size": [],
+ "sys/storage/used": [],
+ "sys/swap/free": [],
+ "sys/swap/max": [],
+ "sys/swap/pageIn": [],
+ "sys/swap/pageOut": [],
+ "sys/tcpv4/activeOpens": [],
+ "sys/tcpv4/attemptFails": [],
+ "sys/tcpv4/estabResets": [],
+ "sys/tcpv4/in/errs": [],
+ "sys/tcpv4/in/segs": [],
+ "sys/tcpv4/out/rsts": [],
+ "sys/tcpv4/out/segs": [],
+ "sys/tcpv4/passiveOpens": [],
+ "sys/tcpv4/retrans/segs": [],
+ "sys/uptime": [],
+ "task/action/batch/attempts": [],
+ "task/action/batch/queueTime": [],
+ "task/action/batch/runTime": [],
+ "task/action/batch/size": [],
+ "task/action/failed/count": [],
+ "task/action/run/time": [],
+ "task/action/success/count": [],
+ "task/autoScaler/requiredCount": [],
+ "task/failed/count": [],
+ "task/pending/count": [],
+ "task/pending/time": [],
+ "task/run/time": [],
+ "task/running/count": [],
+ "task/segmentAvailability/wait/time": [],
+ "task/success/count": [],
+ "task/waiting/count": [],
+ "tier/historical/count": [],
+ "tier/replication/factor": [],
+ "tier/required/capacity": [],
+ "tier/total/capacity": [],
+ "zk/connected": [],
+ "zk/reconnect/time": []
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]