This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bca71070e9 Add some metrics for CoGBK profiling. (#30979)
6bca71070e9 is described below

commit 6bca71070e96b56b781600e8833a72cea329b1a1
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Apr 16 14:51:29 2024 -0700

    Add some metrics for CoGBK profiling. (#30979)
---
 .../org/apache/beam/sdk/transforms/join/CoGbkResult.java  | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 8f7898fc428..2e26d13da54 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.util.common.Reiterator;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
@@ -73,6 +75,10 @@ public class CoGbkResult {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class);
 
+  private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys");
+
+  private Counter largeKeyCount = Metrics.counter(CoGbkResult.class, 
"cogbk-large-keys");
+
   /**
    * A row in the {@link PCollection} resulting from a {@link CoGroupByKey} 
transform. Currently,
    * this row must fit into memory.
@@ -91,6 +97,7 @@ public class CoGbkResult {
       int inMemoryElementCount,
       int minElementsPerTag) {
     this.schema = schema;
+    keyCount.inc();
     List<List<Object>> valuesByTag = new ArrayList<>();
     for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
       valuesByTag.add(new ArrayList<>());
@@ -103,6 +110,7 @@ public class CoGbkResult {
     while (taggedIter.hasNext()) {
       if (elementCount++ >= inMemoryElementCount) {
         // Let the tails be lazy.
+        largeKeyCount.inc();
         break;
       }
       RawUnionValue value = taggedIter.next();
@@ -636,6 +644,10 @@ public class CoGbkResult {
     }
 
     void finish() {
+      Metrics.counter(
+              CoGbkResult.class,
+              this.tail == null ? "cogbk-small-iterables" : 
"cogbk-large-iterables")
+          .inc();
       finished = true;
     }
 
@@ -838,8 +850,11 @@ public class CoGbkResult {
       // We got to the end of the iterable, update the shared set of values 
with those sets that
       // were small enough to cache.
       if (!sharedSeenEnd[0]) {
+        Counter smallIterablesCount = Metrics.counter(CoGbkResult.class, 
"cogbk-small-iterables");
+        Counter largeIterablesCount = Metrics.counter(CoGbkResult.class, 
"cogbk-large-iterables");
         for (int i = 0; i < sharedValueMap.size(); i++) {
           List<Object> localValues = localValueMap.get(i);
+          (localValues == null ? largeIterablesCount : 
smallIterablesCount).inc();
           sharedValueMap.set(
               i, localValues != null ? localValues : 
simpleFilteringIterable(taggedIterable, i));
         }

Reply via email to