This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 50a3403a474 Export string sets in monitoring infos. (#31838)
50a3403a474 is described below
commit 50a3403a4742e1c9e264f57f4411969daeff4642
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Jul 11 09:45:09 2024 -0700
Export string sets in monitoring infos. (#31838)
---
.../apache/beam/model/pipeline/v1/metrics.proto | 11 +++++
.../runners/core/metrics/MetricsContainerImpl.java | 53 +++++++++++++++++++++-
.../core/metrics/MonitoringInfoConstants.java | 2 +
.../core/metrics/SimpleMonitoringInfoBuilder.java | 11 +++++
.../core/metrics/MetricsContainerImplTest.java | 32 +++++++++++++
5 files changed, 108 insertions(+), 1 deletion(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index 13c87bc1130..4ec189e4637 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -187,6 +187,17 @@ message MonitoringInfoSpecs {
}]
}];
+ // Represents a set of strings seen across bundles.
+ USER_SET_STRING = 21 [(monitoring_info_spec) = {
+ urn: "beam:metric:user:set_string:v1",
+ type: "beam:metrics:set_string:v1",
+ required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+ annotations: [{
+ key: "description",
+ value: "URN utilized to report user metric."
+ }]
+ }];
+
// General monitored state information which contains structured
information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index a2f6511d512..99cf9850850 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.core.metrics;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
@@ -331,6 +334,28 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
return builder.build();
}
+ /** @return The MonitoringInfo metadata from the string set metric. */
+ private @Nullable SimpleMonitoringInfoBuilder
stringSetToMonitoringMetadata(MetricKey metricKey) {
+ return metricToMonitoringMetadata(
+ metricKey,
+ MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE,
+ MonitoringInfoConstants.Urns.USER_SET_STRING);
+ }
+
+ /**
+ * @param metricUpdate
+ * @return The MonitoringInfo generated from the string set metricUpdate.
+ */
+ private @Nullable MonitoringInfo stringSetUpdateToMonitoringInfo(
+ MetricUpdate<StringSetData> metricUpdate) {
+ SimpleMonitoringInfoBuilder builder =
stringSetToMonitoringMetadata(metricUpdate.getKey());
+ if (builder == null) {
+ return null;
+ }
+ builder.setStringSetValue(metricUpdate.getUpdate());
+ return builder.build();
+ }
+
/** Return the cumulative values for any metrics in this container as
MonitoringInfos. */
@Override
public Iterable<MonitoringInfo> getMonitoringInfos() {
@@ -358,6 +383,13 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
monitoringInfos.add(mi);
}
}
+
+ for (MetricUpdate<StringSetData> metricUpdate :
metricUpdates.stringSetUpdates()) {
+ MonitoringInfo mi = stringSetUpdateToMonitoringInfo(metricUpdate);
+ if (mi != null) {
+ monitoringInfos.add(mi);
+ }
+ }
return monitoringInfos;
}
@@ -391,6 +423,15 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
}
}
});
+ stringSets.forEach(
+ (metricName, stringSetCell) -> {
+ if (stringSetCell.getDirty().beforeCommit()) {
+ String shortId = getShortId(metricName,
this::stringSetToMonitoringMetadata, shortIds);
+ if (shortId != null) {
+ builder.put(shortId,
encodeStringSet(stringSetCell.getCumulative()));
+ }
+ }
+ });
return builder.build();
}
@@ -418,7 +459,7 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
}
/**
- * Mark all the updates that were retrieved with the latest call to {@link
#getUpdates()} as
+ * Mark all of the updates that were retrieved with the latest call to
{@link #getUpdates()} as
* committed.
*/
public void commitUpdates() {
@@ -480,6 +521,12 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
gauge.update(decodeInt64Gauge(monitoringInfo.getPayload()));
}
+ private void updateForStringSetType(MonitoringInfo monitoringInfo) {
+ MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
+ StringSetCell stringSet = getStringSet(metricName);
+ stringSet.update(decodeStringSet(monitoringInfo.getPayload()));
+ }
+
/** Update values of this {@link MetricsContainerImpl} by reading from
{@code monitoringInfos}. */
public void update(Iterable<MonitoringInfo> monitoringInfos) {
for (MonitoringInfo monitoringInfo : monitoringInfos) {
@@ -500,6 +547,10 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
updateForLatestInt64Type(monitoringInfo);
break;
+ case SET_STRING_TYPE:
+ updateForStringSetType(monitoringInfo);
+ break;
+
default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 697fc8487c6..2bb935111d3 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -52,6 +52,8 @@ public final class MonitoringInfoConstants {
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_INT64);
public static final String USER_DISTRIBUTION_DOUBLE =
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE);
+ public static final String USER_SET_STRING =
+ extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
public static final String SAMPLED_BYTE_SIZE =
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED =
extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index c44a2621ee6..e0f5092e6b1 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -23,6 +23,7 @@ import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encod
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static
org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import java.util.HashMap;
@@ -148,6 +149,16 @@ public class SimpleMonitoringInfoBuilder {
return this;
}
+ /**
+ * Encodes the value and sets the type to {@link
+ * MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}.
+ */
+ public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) {
+ this.builder.setPayload(encodeStringSet(value));
+ this.builder.setType(MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE);
+ return this;
+ }
+
/** Sets the MonitoringInfo label to the given name and value. */
public SimpleMonitoringInfoBuilder setLabel(String labelName, String
labelValue) {
this.builder.putLabels(labelName, labelValue);
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 809919f611b..5b3d71f4873 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -270,6 +270,38 @@ public class MetricsContainerImplTest {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(),
builder2.build()));
}
+ @Test
+ public void testMonitoringInfosArePopulatedForUserStringSets() {
+ MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+ StringSetCell stringSetCellA =
testObject.getStringSet(MetricName.named("ns", "nameA"));
+ StringSetCell stringSetCellB =
testObject.getStringSet(MetricName.named("ns", "nameB"));
+ stringSetCellA.add("A");
+ stringSetCellB.add("BBB");
+
+ SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+ builder1
+ .setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "nameA")
+ .setStringSetValue(stringSetCellA.getCumulative())
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+ SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+ builder2
+ .setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "nameB")
+ .setStringSetValue(stringSetCellB.getCumulative())
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+ List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>();
+ for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+ actualMonitoringInfos.add(mi);
+ }
+
+ assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(),
builder2.build()));
+ }
+
@Test
public void testMonitoringInfosArePopulatedForSystemDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");