ajamato commented on a change in pull request #14490:
URL: https://github.com/apache/beam/pull/14490#discussion_r615111991



##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
##########
@@ -36,4 +38,9 @@
 
   /** Reset this metric. */
   void reset();
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  default @Nullable DateTime getStartTime() {
+    return null;

Review comment:
       I didn't want to add this to all the metrics, because I didn't want to 
add unncessary calls to the system to get clock times.
   
   Let's add it to just this place first, and consider adding more if they are 
needed for another project.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessWideInstructionHandler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s and {@link
+ * BeamFnApi.MonitoringInfosMetadataRequest} and {@link 
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed 
bundles. They return
+ * MonitoringInfos payloads and MonitoringInfo metadata for "process-wide" 
metrics, which return
+ * metric values calculated over the life of the process.
+ */
+public class ProcessWideInstructionHandler {

Review comment:
       Renamed to HarnessMonitoringInfosInstructionHandler, which is all this 
does now.

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
##########
@@ -302,7 +325,7 @@ public void commitUpdates() {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
     for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
       UpdateT update = checkNotNull(cell.getValue().getCumulative());
-      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update));
+      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update, null));

Review comment:
       Done

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
##########
@@ -36,4 +38,9 @@
 
   /** Reset this metric. */
   void reset();
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */

Review comment:
       Update the comment

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
##########
@@ -54,4 +55,15 @@
   default Histogram getHistogram(MetricName metricName, 
HistogramData.BucketType bucketType) {
     throw new RuntimeException("Histogram metric is not supported yet.");
   }
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
+    throw new RuntimeException("getMonitoringInfos is not implemented on this 
MetricsContainer.");
+  }
+
+  default boolean isProcessWide() {
+    return false;
+  }
+
+  default void setIsProcessWide(boolean procesWide) {}

Review comment:
       Done

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/MonitoringInfoShortIdCache.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+
+public class MonitoringInfoShortIdCache implements AutoCloseable {
+
+  private int lastShortId = 0;
+
+  private HashMap<MonitoringInfoMetricName, String> infoKeyToShortId = new 
HashMap<>();
+
+  private HashMap<String, MetricsApi.MonitoringInfo> shortIdToInfo = new 
HashMap<>();
+

Review comment:
       This file was deleted. As @robertwb checked in a different 
implemetnation in the last few days.

##########
File path: 
runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
##########
@@ -98,7 +98,8 @@ public void flush(boolean async) {
       UpdateT value = cell.getValue();
       if (value != null) {
         MetricKey key = MetricKey.create(stepName, cell.getName());
-        MetricUpdates.MetricUpdate<UpdateT> update = 
MetricUpdates.MetricUpdate.create(key, value);
+        MetricUpdates.MetricUpdate<UpdateT> update =
+            MetricUpdates.MetricUpdate.create(key, value, null);

Review comment:
       No, this is actually now the same MetricCell class. so I can't pull a 
startTime off it.
   
   its actually a CellT extends AbstractMetric<UpdateT>

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessWideInstructionHandler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s and {@link
+ * BeamFnApi.MonitoringInfosMetadataRequest} and {@link 
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed 
bundles. They return
+ * MonitoringInfos payloads and MonitoringInfo metadata for "process-wide" 
metrics, which return
+ * metric values calculated over the life of the process.
+ */
+public class ProcessWideInstructionHandler {
+  public ProcessWideInstructionHandler() {}
+
+  public static BeamFnApi.InstructionResponse.Builder monitoringInfoMetadata(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.MonitoringInfosMetadataResponse.Builder response =
+        BeamFnApi.MonitoringInfosMetadataResponse.newBuilder();
+    response.putAllMonitoringInfo(
+        MonitoringInfoShortIdCache.getShortIdCache()
+            .getInfos(request.getMonitoringInfos().getMonitoringInfoIdList()));

Review comment:
       That wasn't implemented when I sent you this PR. Though robert's change 
did introduce it.
   I have updated the PR to merge with his implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to