This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a3403a474 Export string sets in monitoring infos. (#31838)
50a3403a474 is described below

commit 50a3403a4742e1c9e264f57f4411969daeff4642
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Jul 11 09:45:09 2024 -0700

    Export string sets in monitoring infos. (#31838)
---
 .../apache/beam/model/pipeline/v1/metrics.proto    | 11 +++++
 .../runners/core/metrics/MetricsContainerImpl.java | 53 +++++++++++++++++++++-
 .../core/metrics/MonitoringInfoConstants.java      |  2 +
 .../core/metrics/SimpleMonitoringInfoBuilder.java  | 11 +++++
 .../core/metrics/MetricsContainerImplTest.java     | 32 +++++++++++++
 5 files changed, 108 insertions(+), 1 deletion(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index 13c87bc1130..4ec189e4637 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -187,6 +187,17 @@ message MonitoringInfoSpecs {
       }]
     }];
 
+    // Represents a set of strings seen across bundles.
+    USER_SET_STRING = 21 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:set_string:v1",
+      type: "beam:metrics:set_string:v1",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user metric."
+      }]
+    }];
+
     // General monitored state information which contains structured 
information
     // which does not fit into a typical metric format. See MonitoringTableData
     // for more details.
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index a2f6511d512..99cf9850850 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.core.metrics;
 
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
@@ -331,6 +334,28 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
     return builder.build();
   }
 
+  /** @return The MonitoringInfo metadata from the string set metric. */
+  private @Nullable SimpleMonitoringInfoBuilder 
stringSetToMonitoringMetadata(MetricKey metricKey) {
+    return metricToMonitoringMetadata(
+        metricKey,
+        MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE,
+        MonitoringInfoConstants.Urns.USER_SET_STRING);
+  }
+
+  /**
+   * @param metricUpdate
+   * @return The MonitoringInfo generated from the string set metricUpdate.
+   */
+  private @Nullable MonitoringInfo stringSetUpdateToMonitoringInfo(
+      MetricUpdate<StringSetData> metricUpdate) {
+    SimpleMonitoringInfoBuilder builder = 
stringSetToMonitoringMetadata(metricUpdate.getKey());
+    if (builder == null) {
+      return null;
+    }
+    builder.setStringSetValue(metricUpdate.getUpdate());
+    return builder.build();
+  }
+
   /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
   @Override
   public Iterable<MonitoringInfo> getMonitoringInfos() {
@@ -358,6 +383,13 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
         monitoringInfos.add(mi);
       }
     }
+
+    for (MetricUpdate<StringSetData> metricUpdate : 
metricUpdates.stringSetUpdates()) {
+      MonitoringInfo mi = stringSetUpdateToMonitoringInfo(metricUpdate);
+      if (mi != null) {
+        monitoringInfos.add(mi);
+      }
+    }
     return monitoringInfos;
   }
 
@@ -391,6 +423,15 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
             }
           }
         });
+    stringSets.forEach(
+        (metricName, stringSetCell) -> {
+          if (stringSetCell.getDirty().beforeCommit()) {
+            String shortId = getShortId(metricName, 
this::stringSetToMonitoringMetadata, shortIds);
+            if (shortId != null) {
+              builder.put(shortId, 
encodeStringSet(stringSetCell.getCumulative()));
+            }
+          }
+        });
     return builder.build();
   }
 
@@ -418,7 +459,7 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
   }
 
   /**
-   * Mark all the updates that were retrieved with the latest call to {@link 
#getUpdates()} as
+   * Mark all of the updates that were retrieved with the latest call to 
{@link #getUpdates()} as
    * committed.
    */
   public void commitUpdates() {
@@ -480,6 +521,12 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
     gauge.update(decodeInt64Gauge(monitoringInfo.getPayload()));
   }
 
+  private void updateForStringSetType(MonitoringInfo monitoringInfo) {
+    MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
+    StringSetCell stringSet = getStringSet(metricName);
+    stringSet.update(decodeStringSet(monitoringInfo.getPayload()));
+  }
+
   /** Update values of this {@link MetricsContainerImpl} by reading from 
{@code monitoringInfos}. */
   public void update(Iterable<MonitoringInfo> monitoringInfos) {
     for (MonitoringInfo monitoringInfo : monitoringInfos) {
@@ -500,6 +547,10 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
           updateForLatestInt64Type(monitoringInfo);
           break;
 
+        case SET_STRING_TYPE:
+          updateForStringSetType(monitoringInfo);
+          break;
+
         default:
           LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
       }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 697fc8487c6..2bb935111d3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -52,6 +52,8 @@ public final class MonitoringInfoConstants {
         extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_INT64);
     public static final String USER_DISTRIBUTION_DOUBLE =
         extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE);
+    public static final String USER_SET_STRING =
+        extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
     public static final String SAMPLED_BYTE_SIZE =
         extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
     public static final String WORK_COMPLETED = 
extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index c44a2621ee6..e0f5092e6b1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -23,6 +23,7 @@ import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encod
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
 import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import java.util.HashMap;
@@ -148,6 +149,16 @@ public class SimpleMonitoringInfoBuilder {
     return this;
   }
 
+  /**
+   * Encodes the value and sets the type to {@link
+   * MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}.
+   */
+  public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) {
+    this.builder.setPayload(encodeStringSet(value));
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE);
+    return this;
+  }
+
   /** Sets the MonitoringInfo label to the given name and value. */
   public SimpleMonitoringInfoBuilder setLabel(String labelName, String 
labelValue) {
     this.builder.putLabels(labelName, labelValue);
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 809919f611b..5b3d71f4873 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -270,6 +270,38 @@ public class MetricsContainerImplTest {
     assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), 
builder2.build()));
   }
 
+  @Test
+  public void testMonitoringInfosArePopulatedForUserStringSets() {
+    MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+    StringSetCell stringSetCellA = 
testObject.getStringSet(MetricName.named("ns", "nameA"));
+    StringSetCell stringSetCellB = 
testObject.getStringSet(MetricName.named("ns", "nameB"));
+    stringSetCellA.add("A");
+    stringSetCellB.add("BBB");
+
+    SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+    builder1
+        .setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "nameA")
+        .setStringSetValue(stringSetCellA.getCumulative())
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+    SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+    builder2
+        .setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "nameB")
+        .setStringSetValue(stringSetCellB.getCumulative())
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+    List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>();
+    for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+      actualMonitoringInfos.add(mi);
+    }
+
+    assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), 
builder2.build()));
+  }
+
   @Test
   public void testMonitoringInfosArePopulatedForSystemDistributions() {
     MetricsContainerImpl testObject = new MetricsContainerImpl("step1");

Reply via email to