This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 48ad919  [BEAM-11994] Add HarnessMonitoringInfosRequest/Response 
InstructionRequest and MonitoringInfosMetadataRequest/Response support to Java 
SDK.
     new a60aeba  Merge pull request #14805 from [BEAM-11994] Add 
HarnessMonitoringInfosRequest/Response InstructionRequest and 
MonitoringInfosMetadataRequest/Response support to Java SDK.
48ad919 is described below

commit 48ad9195205d1f1c1229a4469858e58e2b8145bc
Author: Alex Amato <[email protected]>
AuthorDate: Thu Apr 8 20:56:01 2021 -0700

    [BEAM-11994] Add HarnessMonitoringInfosRequest/Response InstructionRequest 
and MonitoringInfosMetadataRequest/Response support to Java SDK.
---
 .../runners/core/construction/Environments.java    |  1 +
 .../runners/core/metrics/MetricsContainerImpl.java | 26 ++++++++-
 .../core/metrics/MetricsContainerImplTest.java     | 10 ++++
 .../apache/beam/sdk/metrics/MetricsContainer.java  |  6 ++
 .../java/org/apache/beam/fn/harness/FnHarness.java | 11 ++++
 .../HarnessMonitoringInfosInstructionHandler.java  | 54 ++++++++++++++++++
 ...rnessMonitoringInfosInstructionHandlerTest.java | 66 ++++++++++++++++++++++
 7 files changed, 172 insertions(+), 2 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index 473ae4e..e743e68 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -386,6 +386,7 @@ public class Environments {
     capabilities.addAll(ModelCoders.urns());
     
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING));
     
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING));
+    
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.HARNESS_MONITORING_INFOS));
     capabilities.add("beam:version:sdk_base:" + 
JAVA_SDK_HARNESS_CONTAINER_URL);
     
capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
     capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING));
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index b8aaa73..4d7c37a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -74,6 +74,8 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
 
   protected final @Nullable String stepName;
 
+  private final boolean isProcessWide;
+
   private MetricsMap<MetricName, CounterCell> counters = new 
MetricsMap<>(CounterCell::new);
 
   private MetricsMap<MetricName, DistributionCell> distributions =
@@ -84,9 +86,25 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
   private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> 
histograms =
       new MetricsMap<>(HistogramCell::new);
 
-  /** Create a new {@link MetricsContainerImpl} associated with the given 
{@code stepName}. */
-  public MetricsContainerImpl(@Nullable String stepName) {
+  private MetricsContainerImpl(@Nullable String stepName, boolean 
isProcessWide) {
     this.stepName = stepName;
+    this.isProcessWide = isProcessWide;
+  }
+
+  /**
+   * Create a new {@link MetricsContainerImpl} associated with the given 
{@code stepName}. If
+   * stepName is null, this MetricsContainer is not bound to a step.
+   */
+  public MetricsContainerImpl(@Nullable String stepName) {
+    this(stepName, false);
+  }
+
+  /**
+   * Create a new {@link MetricsContainerImpl} associated with the entire 
process. Used for
+   * collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
+   */
+  public static MetricsContainerImpl createProcessWideContainer() {
+    return new MetricsContainerImpl(null, true);
   }
 
   @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(
@@ -96,6 +114,9 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
 
   /** Reset the metrics. */
   public void reset() {
+    if (this.isProcessWide) {
+      throw new RuntimeException("Process Wide metric containers must not be 
reset");
+    }
     reset(counters);
     reset(distributions);
     reset(gauges);
@@ -270,6 +291,7 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
   }
 
   /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  @Override
   public Iterable<MonitoringInfo> getMonitoringInfos() {
     // Extract user metrics and store as MonitoringInfos.
     ArrayList<MonitoringInfo> monitoringInfos = new 
ArrayList<MonitoringInfo>();
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 837432f..c20ee5d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.emptyIterable;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -268,6 +269,15 @@ public class MetricsContainerImplTest {
   }
 
   @Test
+  public void testProcessWideMetricContainerThrowsWhenReset() {
+    MetricsContainerImpl testObject = 
MetricsContainerImpl.createProcessWideContainer();
+    CounterCell c1 = testObject.getCounter(MetricName.named("ns", "name1"));
+    c1.inc(2L);
+
+    assertThrows(RuntimeException.class, () -> testObject.reset());
+  }
+
+  @Test
   public void testEquals() {
     MetricsContainerImpl metricsContainerImpl = new 
MetricsContainerImpl("stepName");
     MetricsContainerImpl equal = new MetricsContainerImpl("stepName");
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 54c28ff..f10fccd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.metrics;
 
 import java.io.Serializable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.util.HistogramData;
@@ -54,4 +55,9 @@ public interface MetricsContainer extends Serializable {
   default Histogram getHistogram(MetricName metricName, 
HistogramData.BucketType bucketType) {
     throw new RuntimeException("Histogram metric is not supported yet.");
   }
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
+    throw new RuntimeException("getMonitoringInfos is not implemented on this 
MetricsContainer.");
+  }
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index b687876..dee0b3d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
 import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
+import 
org.apache.beam.fn.harness.control.HarnessMonitoringInfosInstructionHandler;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
@@ -43,6 +44,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.ShortIdMap;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.fn.IdGenerator;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
@@ -262,6 +265,8 @@ public class FnHarness {
                     }
                   });
 
+      
MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
+
       ProcessBundleHandler processBundleHandler =
           new ProcessBundleHandler(
               options,
@@ -317,6 +322,12 @@ public class FnHarness {
                                       Collectors.toMap(
                                           Function.identity(), 
metricsShortIds::get)))));
 
+      HarnessMonitoringInfosInstructionHandler processWideHandler =
+          new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
+      handlers.put(
+          InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS,
+          processWideHandler::harnessMonitoringInfos);
+
       JvmInitializers.runBeforeProcessing(options);
 
       String samplingPeriodMills =
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
new file mode 100644
index 0000000..5749455
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.ShortIdMap;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s {@link 
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed 
bundles. They return
+ * MonitoringInfos payloads for "process-wide" metrics, which return metric 
values calculated over
+ * the life of the process.
+ */
+public class HarnessMonitoringInfosInstructionHandler {
+
+  private final ShortIdMap metricsShortIds;
+
+  public HarnessMonitoringInfosInstructionHandler(ShortIdMap metricsShortIds) {
+    this.metricsShortIds = metricsShortIds;
+  }
+
+  public BeamFnApi.InstructionResponse.Builder harnessMonitoringInfos(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.HarnessMonitoringInfosResponse.Builder response =
+        BeamFnApi.HarnessMonitoringInfosResponse.newBuilder();
+    MetricsContainer container = MetricsEnvironment.getProcessWideContainer();
+    if (container != null) {
+      for (MetricsApi.MonitoringInfo info : container.getMonitoringInfos()) {
+        response.putMonitoringData(
+            this.metricsShortIds.getOrCreateShortId(info), info.getPayload());
+      }
+    }
+    return 
BeamFnApi.InstructionResponse.newBuilder().setHarnessMonitoringInfos(response);
+  }
+}
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
new file mode 100644
index 0000000..ac69ed2
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/HarnessMonitoringInfosInstructionHandlerTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.core.metrics.LabeledMetrics;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.runners.core.metrics.ShortIdMap;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.junit.Test;
+
+public class HarnessMonitoringInfosInstructionHandlerTest {
+
+  @Test
+  public void testReturnsProcessWideMonitoringInfos() {
+    
MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
+
+    HashMap<String, String> labels = new HashMap<String, String>();
+    labels.put(MonitoringInfoConstants.Labels.SERVICE, "service");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, "method");
+    labels.put(MonitoringInfoConstants.Labels.RESOURCE, "resource");
+    labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "transform");
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "ok");
+    MonitoringInfoMetricName name =
+        
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
labels);
+    Counter counter = LabeledMetrics.counter(name, true);
+    counter.inc(7);
+
+    ShortIdMap metricsShortIds = new ShortIdMap();
+    HarnessMonitoringInfosInstructionHandler testObject =
+        new HarnessMonitoringInfosInstructionHandler(metricsShortIds);
+
+    BeamFnApi.InstructionRequest.Builder builder = 
BeamFnApi.InstructionRequest.newBuilder();
+    BeamFnApi.InstructionResponse.Builder responseBuilder =
+        testObject.harnessMonitoringInfos(builder.build());
+
+    BeamFnApi.InstructionResponse response = responseBuilder.build();
+    assertEquals(1, 
response.getHarnessMonitoringInfos().getMonitoringDataMap().size());
+
+    // Expect a payload to be set for "metric0".
+    assertTrue(
+        
!response.getHarnessMonitoringInfos().getMonitoringDataMap().get("metric0").isEmpty());
+  }
+}

Reply via email to