This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48ad919 [BEAM-11994] Add HarnessMonitoringInfosRequest/Response
InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java
SDK.
new a60aeba Merge pull request #14805 from [BEAM-11994] Add
HarnessMonitoringInfosRequest/Response InstructionRequest and
MonitoringInfosMetadataRequest/Response support to Java SDK.
48ad919 is described below
commit 48ad9195205d1f1c1229a4469858e58e2b8145bc
Author: Alex Amato <[email protected]>
AuthorDate: Thu Apr 8 20:56:01 2021 -0700
[BEAM-11994] Add HarnessMonitoringInfosRequest/Response InstructionRequest
and MonitoringInfosMetadataRequest/Response support to Java SDK.
---
.../runners/core/construction/Environments.java | 1 +
.../runners/core/metrics/MetricsContainerImpl.java | 26 ++++++++-
.../core/metrics/MetricsContainerImplTest.java | 10 ++++
.../apache/beam/sdk/metrics/MetricsContainer.java | 6 ++
.../java/org/apache/beam/fn/harness/FnHarness.java | 11 ++++
.../HarnessMonitoringInfosInstructionHandler.java | 54 ++++++++++++++++++
...rnessMonitoringInfosInstructionHandlerTest.java | 66 ++++++++++++++++++++++
7 files changed, 172 insertions(+), 2 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 473ae4e..e743e68 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -386,6 +386,7 @@ public class Environments {
capabilities.addAll(ModelCoders.urns());
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING));
+
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.HARNESS_MONITORING_INFOS));
capabilities.add("beam:version:sdk_base:" +
JAVA_SDK_HARNESS_CONTAINER_URL);
capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING));
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index b8aaa73..4d7c37a 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -74,6 +74,8 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
protected final @Nullable String stepName;
+ private final boolean isProcessWide;
+
private MetricsMap<MetricName, CounterCell> counters = new
MetricsMap<>(CounterCell::new);
private MetricsMap<MetricName, DistributionCell> distributions =
@@ -84,9 +86,25 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
histograms =
new MetricsMap<>(HistogramCell::new);
- /** Create a new {@link MetricsContainerImpl} associated with the given
{@code stepName}. */
- public MetricsContainerImpl(@Nullable String stepName) {
+ private MetricsContainerImpl(@Nullable String stepName, boolean
isProcessWide) {
this.stepName = stepName;
+ this.isProcessWide = isProcessWide;
+ }
+
+ /**
+ * Create a new {@link MetricsContainerImpl} associated with the given
{@code stepName}. If
+ * stepName is null, this MetricsContainer is not bound to a step.
+ */
+ public MetricsContainerImpl(@Nullable String stepName) {
+ this(stepName, false);
+ }
+
+ /**
+ * Create a new {@link MetricsContainerImpl} associated with the entire
process. Used for
+ * collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
+ */
+ public static MetricsContainerImpl createProcessWideContainer() {
+ return new MetricsContainerImpl(null, true);
}
@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(
@@ -96,6 +114,9 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
/** Reset the metrics. */
public void reset() {
+ if (this.isProcessWide) {
+ throw new RuntimeException("Process Wide metric containers must not be
reset");
+ }
reset(counters);
reset(distributions);
reset(gauges);
@@ -270,6 +291,7 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
}
/** Return the cumulative values for any metrics in this container as
MonitoringInfos. */
+ @Override
public Iterable<MonitoringInfo> getMonitoringInfos() {
// Extract user metrics and store as MonitoringInfos.
ArrayList<MonitoringInfo> monitoringInfos = new
ArrayList<MonitoringInfo>();
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 837432f..c20ee5d 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.emptyIterable;
import static
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
@@ -268,6 +269,15 @@ public class MetricsContainerImplTest {
}
@Test
+ public void testProcessWideMetricContainerThrowsWhenReset() {
+ MetricsContainerImpl testObject =
MetricsContainerImpl.createProcessWideContainer();
+ CounterCell c1 = testObject.getCounter(MetricName.named("ns", "name1"));
+ c1.inc(2L);
+
+ assertThrows(RuntimeException.class, () -> testObject.reset());
+ }
+
+ @Test
public void testEquals() {
MetricsContainerImpl metricsContainerImpl = new
MetricsContainerImpl("stepName");
MetricsContainerImpl equal = new MetricsContainerImpl("stepName");
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 54c28ff..f10fccd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.metrics;
import java.io.Serializable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.util.HistogramData;
@@ -54,4 +55,9 @@ public interface MetricsContainer extends Serializable {
default Histogram getHistogram(MetricName metricName,
HistogramData.BucketType bucketType) {
throw new RuntimeException("Histogram metric is not supported yet.");
}
+
+ /** Return the cumulative values for any metrics in this container as
MonitoringInfos. */
+ default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
+ throw new RuntimeException("getMonitoringInfos is not implemented on this
MetricsContainer.");
+ }
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index b687876..dee0b3d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
+import
org.apache.beam.fn.harness.control.HarnessMonitoringInfosInstructionHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
@@ -43,6 +44,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
@@ -262,6 +265,8 @@ public class FnHarness {
}
});
+
MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
+
ProcessBundleHandler processBundleHandler =
new ProcessBundleHandler(
options,
@@ -317,6 +322,12 @@ public class FnHarness {
Collectors.toMap(
Function.identity(),
metricsShortIds::get)))));
+ HarnessMonitoringInfosInstructionHandler processWideHandler =
+ new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
+ handlers.put(
+ InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS,
+ processWideHandler::harnessMonitoringInfos);
+
JvmInitializers.runBeforeProcessing(options);
String samplingPeriodMills =
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
new file mode 100644
index 0000000..5749455
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.ShortIdMap;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s {@link
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed
bundles. They return
+ * MonitoringInfos payloads for "process-wide" metrics, which return metric
values calculated over
+ * the life of the process.
+ */
+public class HarnessMonitoringInfosInstructionHandler {
+
+ private final ShortIdMap metricsShortIds;
+
+ public HarnessMonitoringInfosInstructionHandler(ShortIdMap metricsShortIds) {
+ this.metricsShortIds = metricsShortIds;
+ }
+
+ public BeamFnApi.InstructionResponse.Builder harnessMonitoringInfos(
+ BeamFnApi.InstructionRequest request) {
+ BeamFnApi.HarnessMonitoringInfosResponse.Builder response =
+ BeamFnApi.HarnessMonitoringInfosResponse.newBuilder();
+ MetricsContainer container = MetricsEnvironment.getProcessWideContainer();
+ if (container != null) {
+ for (MetricsApi.MonitoringInfo info : container.getMonitoringInfos()) {
+ response.putMonitoringData(
+ this.metricsShortIds.getOrCreateShortId(info), info.getPayload());
+ }
+ }
+ return
BeamFnApi.InstructionResponse.newBuilder().setHarnessMonitoringInfos(response);
+ }
+}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
new file mode 100644
index 0000000..ac69ed2
--- /dev/null
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.core.metrics.LabeledMetrics;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.runners.core.metrics.ShortIdMap;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.junit.Test;
+
+public class HarnessMonitoringInfosInstructionHandlerTest {
+
+ @Test
+ public void testReturnsProcessWideMonitoringInfos() {
+
MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
+
+ HashMap<String, String> labels = new HashMap<String, String>();
+ labels.put(MonitoringInfoConstants.Labels.SERVICE, "service");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, "method");
+ labels.put(MonitoringInfoConstants.Labels.RESOURCE, "resource");
+ labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "transform");
+ labels.put(MonitoringInfoConstants.Labels.STATUS, "ok");
+ MonitoringInfoMetricName name =
+
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
labels);
+ Counter counter = LabeledMetrics.counter(name, true);
+ counter.inc(7);
+
+ ShortIdMap metricsShortIds = new ShortIdMap();
+ HarnessMonitoringInfosInstructionHandler testObject =
+ new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
+
+ BeamFnApi.InstructionRequest.Builder builder =
BeamFnApi.InstructionRequest.newBuilder();
+ BeamFnApi.InstructionResponse.Builder responseBuilder =
+ testObject.harnessMonitoringInfos(builder.build());
+
+ BeamFnApi.InstructionResponse response = responseBuilder.build();
+ assertEquals(1,
response.getHarnessMonitoringInfos().getMonitoringDataMap().size());
+
+ // Expect a payload to be set for "metric0".
+ assertTrue(
+
!response.getHarnessMonitoringInfos().getMonitoringDataMap().get("metric0").isEmpty());
+ }
+}