This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 630ca9f7024 fix: Emit publish metrics only when tasks actually
publish. (#19395)
630ca9f7024 is described below
commit 630ca9f702482ddf1ad357f5838f780c9c47e1a1
Author: Gian Merlino <[email protected]>
AuthorDate: Tue May 12 15:08:10 2026 -0700
fix: Emit publish metrics only when tasks actually publish. (#19395)
With task replicas, this avoids double-emitting of metrics.
---
.../SeekableStreamIndexTaskRunner.java | 21 ++++------
.../SeekableStreamIndexTaskRunnerTest.java | 3 +-
.../appenderator/BaseAppenderatorDriver.java | 6 ++-
.../appenderator/SegmentsAndCommitMetadata.java | 49 +++++++++++++++++++---
.../appenderator/StreamAppenderatorDriver.java | 4 +-
.../SegmentsAndCommitMetadataTest.java | 35 ++++++++++++++++
6 files changed, 95 insertions(+), 23 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 515bafc5054..38de3dc49fe 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1079,20 +1079,15 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
},
MoreExecutors.directExecutor()
);
- // emit segment count metric:
- int segmentCount = 0;
- long totalRowCount = 0;
- if (publishedSegmentsAndCommitMetadata != null
- && publishedSegmentsAndCommitMetadata.getSegments() != null) {
- segmentCount =
publishedSegmentsAndCommitMetadata.getSegments().size();
- totalRowCount =
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
+
+ // Emit publish metrics only when this task actually committed the
segments.
+ if (publishedSegmentsAndCommitMetadata.wasPublished()) {
+ final int segmentCount =
publishedSegmentsAndCommitMetadata.getSegments().size();
+ final long totalRowCount =
+
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
+ task.emitMetric(toolbox.getEmitter(), "ingest/segments/count",
segmentCount);
+ task.emitMetric(toolbox.getEmitter(), "ingest/rows/published",
totalRowCount);
}
- task.emitMetric(
- toolbox.getEmitter(),
- "ingest/segments/count",
- segmentCount
- );
- task.emitMetric(toolbox.getEmitter(), "ingest/rows/published",
totalRowCount);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 797cbffd937..f08ee84d89d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -314,7 +314,8 @@ public class SeekableStreamIndexTaskRunnerTest
.withNumPartitions(10)
.withNumRows(1_000)
.eachOfSizeInMb(500);
- final SegmentsAndCommitMetadata commitMetadata = new
SegmentsAndCommitMetadata(segment, "offset-100");
+ final SegmentsAndCommitMetadata commitMetadata =
+ new SegmentsAndCommitMetadata(segment,
"offset-100").withWasPublished(true);
final StreamAppenderatorDriver driver =
Mockito.mock(StreamAppenderatorDriver.class);
Mockito.when(task.newDriver(any(), any(), any()))
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 7ca7fd74c2d..257b1db7278 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -566,7 +566,7 @@ public abstract class BaseAppenderatorDriver implements
Closeable
metadata == null ? null : ((AppenderatorDriverMetadata)
metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
- );
+ ).withWasPublished(segmentsAndCommitMetadata.wasPublished());
},
MoreExecutors.directExecutor()
);
@@ -648,7 +648,9 @@ public abstract class BaseAppenderatorDriver implements
Closeable
}
log.info("Published segment schemas[%s].",
segmentsAndCommitMetadata.getSegmentSchemaMapping());
- return
segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
+ return segmentsAndCommitMetadata
+ .withUpgradedSegments(upgradedSegments)
+ .withWasPublished(true);
} else {
// Publishing didn't affirmatively succeed. However, segments
// with these IDs may have already been published:
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
index dfbfa621a23..dac9b1b8f34 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
@@ -38,12 +38,17 @@ public class SegmentsAndCommitMetadata
private final ImmutableSet<DataSegment> upgradedSegments;
+ /**
+ * Whether this object represents segments that were published to the
metadata store.
+ */
+ private final boolean wasPublished;
+
public SegmentsAndCommitMetadata(
List<DataSegment> segments,
Object commitMetadata
)
{
- this(segments, commitMetadata, null, null);
+ this(segments, commitMetadata, null, null, false);
}
public SegmentsAndCommitMetadata(
@@ -52,7 +57,7 @@ public class SegmentsAndCommitMetadata
SegmentSchemaMapping segmentSchemaMapping
)
{
- this(segments, commitMetadata, segmentSchemaMapping, null);
+ this(segments, commitMetadata, segmentSchemaMapping, null, false);
}
public SegmentsAndCommitMetadata(
@@ -61,11 +66,23 @@ public class SegmentsAndCommitMetadata
@Nullable SegmentSchemaMapping segmentSchemaMapping,
@Nullable Set<DataSegment> upgradedSegments
)
+ {
+ this(segments, commitMetadata, segmentSchemaMapping, upgradedSegments,
false);
+ }
+
+ private SegmentsAndCommitMetadata(
+ List<DataSegment> segments,
+ @Nullable Object commitMetadata,
+ @Nullable SegmentSchemaMapping segmentSchemaMapping,
+ @Nullable Set<DataSegment> upgradedSegments,
+ boolean wasPublished
+ )
{
this.segments = ImmutableList.copyOf(segments);
this.commitMetadata = commitMetadata;
this.upgradedSegments = upgradedSegments == null ? null :
ImmutableSet.copyOf(upgradedSegments);
this.segmentSchemaMapping = segmentSchemaMapping;
+ this.wasPublished = wasPublished;
}
public SegmentsAndCommitMetadata withUpgradedSegments(Set<DataSegment>
upgradedSegments)
@@ -74,7 +91,19 @@ public class SegmentsAndCommitMetadata
this.segments,
this.commitMetadata,
this.segmentSchemaMapping,
- upgradedSegments
+ upgradedSegments,
+ this.wasPublished
+ );
+ }
+
+ public SegmentsAndCommitMetadata withWasPublished(boolean wasPublished)
+ {
+ return new SegmentsAndCommitMetadata(
+ this.segments,
+ this.commitMetadata,
+ this.segmentSchemaMapping,
+ this.upgradedSegments,
+ wasPublished
);
}
@@ -103,6 +132,14 @@ public class SegmentsAndCommitMetadata
return segmentSchemaMapping;
}
+ /**
+ * @see #wasPublished
+ */
+ public boolean wasPublished()
+ {
+ return wasPublished;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -113,7 +150,8 @@ public class SegmentsAndCommitMetadata
return false;
}
SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o;
- return Objects.equals(commitMetadata, that.commitMetadata) &&
+ return wasPublished == that.wasPublished &&
+ Objects.equals(commitMetadata, that.commitMetadata) &&
Objects.equals(upgradedSegments, that.upgradedSegments) &&
Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) &&
Objects.equals(segments, that.segments);
@@ -122,7 +160,7 @@ public class SegmentsAndCommitMetadata
@Override
public int hashCode()
{
- return Objects.hash(commitMetadata, segments, upgradedSegments,
segmentSchemaMapping);
+ return Objects.hash(commitMetadata, segments, upgradedSegments,
segmentSchemaMapping, wasPublished);
}
@Override
@@ -133,6 +171,7 @@ public class SegmentsAndCommitMetadata
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", upgradedSegments=" +
SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) +
", segmentSchemaMapping=" + segmentSchemaMapping +
+ ", wasPublished=" + wasPublished +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 20430294d3b..53b0fcbba3b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -341,7 +341,7 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
- )
+ ).withWasPublished(segmentsAndCommitMetadata.wasPublished())
);
}
@@ -385,7 +385,7 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
((AppenderatorDriverMetadata)
metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
- )
+
).withWasPublished(segmentsAndCommitMetadata.wasPublished())
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
new file mode 100644
index 00000000000..43df387598c
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class SegmentsAndCommitMetadataTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(SegmentsAndCommitMetadata.class)
+ .withNonnullFields("segments")
+ .usingGetClass()
+ .verify();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]