pabloem commented on a change in pull request #14490:
URL: https://github.com/apache/beam/pull/14490#discussion_r613568006



##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
##########
@@ -36,4 +38,9 @@
 
   /** Reset this metric. */
   void reset();
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  default @Nullable DateTime getStartTime() {
+    return null;

Review comment:
       Does it make sense to not add a default, and make this not-nullable? 
(i.e. force all implementers to define a non-null start time?)

##########
File path: 
runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
##########
@@ -98,7 +98,8 @@ public void flush(boolean async) {
       UpdateT value = cell.getValue();
       if (value != null) {
         MetricKey key = MetricKey.create(stepName, cell.getName());
-        MetricUpdates.MetricUpdate<UpdateT> update = 
MetricUpdates.MetricUpdate.create(key, value);
+        MetricUpdates.MetricUpdate<UpdateT> update =
+            MetricUpdates.MetricUpdate.create(key, value, null);

Review comment:
       does it make sense to get the time from the cell?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/MonitoringInfoShortIdCache.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+
+public class MonitoringInfoShortIdCache implements AutoCloseable {
+
+  private int lastShortId = 0;
+
+  private HashMap<MonitoringInfoMetricName, String> infoKeyToShortId = new 
HashMap<>();
+
+  private HashMap<String, MetricsApi.MonitoringInfo> shortIdToInfo = new 
HashMap<>();
+
+  private static final AtomicReference<MonitoringInfoShortIdCache> 
SHORT_ID_CACHE =
+      new AtomicReference<>(new MonitoringInfoShortIdCache());
+
+  private MonitoringInfoShortIdCache() {}
+
+  /** Return the {@link MetricsContainer} for the current process. */
+  public static MonitoringInfoShortIdCache getShortIdCache() {
+    return SHORT_ID_CACHE.get();
+  }
+
+  /**
+   * Initialize and assign the {@link MonitoringInfoShortIdCache} to the 
current process. Usage: try
+   * (MonitoringInfoShortIdCache cache = 
MonitoringInfoShortIdCache.newShortIdCache()) { ; } catch
+   * (Exception e) { // Handle error when AutoClosure calls close() }
+   *
+   * <p>Instead of setting this statically, we require that it be initialized. 
This allows tests to
+   * set and clear the ShortIdCache appropriately.
+   *
+   * @return The ShortIdCache for the current process.
+   */
+  public static MonitoringInfoShortIdCache newShortIdCache() {
+    return SHORT_ID_CACHE.getAndSet(new MonitoringInfoShortIdCache());

Review comment:
       maybe we should note that this returns the previous value, not the new 
one, right? perhaps it's worth not returning anything, or renaming the function 
to convey that it returns the previous cache?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessWideInstructionHandler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s and {@link
+ * BeamFnApi.MonitoringInfosMetadataRequest} and {@link 
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed 
bundles. They return
+ * MonitoringInfos payloads and MonitoringInfo metadata for "process-wide" 
metrics, which return
+ * metric values calculated over the life of the process.
+ */
+public class ProcessWideInstructionHandler {

Review comment:
       maybe rename this to reference MonitoringInfoMetadataInstructionHandler 
or something like that? Since it has process-wide and non-process-wide MIs

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
##########
@@ -36,4 +38,9 @@
 
   /** Reset this metric. */
   void reset();
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */

Review comment:
       is this docstring right?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
##########
@@ -54,4 +55,15 @@
   default Histogram getHistogram(MetricName metricName, 
HistogramData.BucketType bucketType) {
     throw new RuntimeException("Histogram metric is not supported yet.");
   }
+
+  /** Return the cumulative values for any metrics in this container as 
MonitoringInfos. */
+  default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
+    throw new RuntimeException("getMonitoringInfos is not implemented on this 
MetricsContainer.");
+  }
+
+  default boolean isProcessWide() {
+    return false;
+  }
+
+  default void setIsProcessWide(boolean procesWide) {}

Review comment:
       if setting does not do anything, maybe we should throw an error instead 
of ignoring the instruction silently?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/MonitoringInfoShortIdCache.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+
+public class MonitoringInfoShortIdCache implements AutoCloseable {
+
+  private int lastShortId = 0;
+
+  private HashMap<MonitoringInfoMetricName, String> infoKeyToShortId = new 
HashMap<>();
+
+  private HashMap<String, MetricsApi.MonitoringInfo> shortIdToInfo = new 
HashMap<>();
+

Review comment:
       I wonder if these should use a Guava cache with automatic invalidation 
and so on?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessWideInstructionHandler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.control;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+
+/**
+ * Processes {@link BeamFnApi.InstructionRequest}'s and {@link
+ * BeamFnApi.MonitoringInfosMetadataRequest} and {@link 
BeamFnApi.HarnessMonitoringInfosResponse}
+ *
+ * <p>These instructions are not associated with the currently processed 
bundles. They return
+ * MonitoringInfos payloads and MonitoringInfo metadata for "process-wide" 
metrics, which return
+ * metric values calculated over the life of the process.
+ */
+public class ProcessWideInstructionHandler {
+  public ProcessWideInstructionHandler() {}
+
+  public static BeamFnApi.InstructionResponse.Builder monitoringInfoMetadata(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.MonitoringInfosMetadataResponse.Builder response =
+        BeamFnApi.MonitoringInfosMetadataResponse.newBuilder();
+    response.putAllMonitoringInfo(
+        MonitoringInfoShortIdCache.getShortIdCache()
+            .getInfos(request.getMonitoringInfos().getMonitoringInfoIdList()));

Review comment:
       It seems that the only moment when a MInfo gets added to the 
ShortIdCache is when we call `getShortId` on it, right? I don't see 
`getShortId` getting called for the `monitoringInfoMetadata`, only for the 
`harnessMonitoringInfos` - so I wonder how do we get the monitoring infos from 
the other containers in this call?

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
##########
@@ -302,7 +325,7 @@ public void commitUpdates() {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
     for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
       UpdateT update = checkNotNull(cell.getValue().getCumulative());
-      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update));
+      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update, null));

Review comment:
       maybe the start time should be the current time? or maybe not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to