This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6bca71070e9 Add some metrics for CoGBK profiling. (#30979)
6bca71070e9 is described below
commit 6bca71070e96b56b781600e8833a72cea329b1a1
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Apr 16 14:51:29 2024 -0700
Add some metrics for CoGBK profiling. (#30979)
---
.../org/apache/beam/sdk/transforms/join/CoGbkResult.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 8f7898fc428..2e26d13da54 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.common.Reiterator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
@@ -73,6 +75,10 @@ public class CoGbkResult {
private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class);
+ private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys");
+
+ private Counter largeKeyCount = Metrics.counter(CoGbkResult.class,
"cogbk-large-keys");
+
/**
* A row in the {@link PCollection} resulting from a {@link CoGroupByKey}
transform. Currently,
* this row must fit into memory.
@@ -91,6 +97,7 @@ public class CoGbkResult {
int inMemoryElementCount,
int minElementsPerTag) {
this.schema = schema;
+ keyCount.inc();
List<List<Object>> valuesByTag = new ArrayList<>();
for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
valuesByTag.add(new ArrayList<>());
@@ -103,6 +110,7 @@ public class CoGbkResult {
while (taggedIter.hasNext()) {
if (elementCount++ >= inMemoryElementCount) {
// Let the tails be lazy.
+ largeKeyCount.inc();
break;
}
RawUnionValue value = taggedIter.next();
@@ -636,6 +644,10 @@ public class CoGbkResult {
}
void finish() {
+ Metrics.counter(
+ CoGbkResult.class,
+ this.tail == null ? "cogbk-small-iterables" :
"cogbk-large-iterables")
+ .inc();
finished = true;
}
@@ -838,8 +850,11 @@ public class CoGbkResult {
// We got to the end of the iterable, update the shared set of values
with those sets that
// were small enough to cache.
if (!sharedSeenEnd[0]) {
+ Counter smallIterablesCount = Metrics.counter(CoGbkResult.class,
"cogbk-small-iterables");
+ Counter largeIterablesCount = Metrics.counter(CoGbkResult.class,
"cogbk-large-iterables");
for (int i = 0; i < sharedValueMap.size(); i++) {
List<Object> localValues = localValueMap.get(i);
+ (localValues == null ? largeIterablesCount :
smallIterablesCount).inc();
sharedValueMap.set(
i, localValues != null ? localValues :
simpleFilteringIterable(taggedIterable, i));
}