JayajP commented on code in PR #30089:
URL: https://github.com/apache/beam/pull/30089#discussion_r1465361704


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -178,4 +192,109 @@ public static Iterable<CounterUpdate> 
extractMetricUpdates(
   public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics) 
{
     StreamingStepMetricsContainer.enablePerWorkerMetrics = 
enablePerWorkerMetrics;
   }
+
+  /**
+   * Updates {@code perWorkerCountersByFirstStaleTime} with the current 
zero-valued metrics and
+   * removes metrics that have been stale for longer than {@code 
maximumPerWorkerCounterStaleness}
+   * from {@code perWorkerCounters}.
+   *
+   * @param currentZeroValuedCounters Current zero-valued perworker counters.
+   * @param extractionTime Time {@code currentZeroValuedCounters} were 
discovered to be zero-valued.
+   */
+  private void deleteStaleCounters(
+      Set<MetricName> currentZeroValuedCounters, Instant extractionTime) {
+    // perWorkerCountersByFirstStaleTime should only contain metrics that are 
currently zero-valued.
+    
perWorkerCountersByFirstStaleTime.keySet().retainAll(currentZeroValuedCounters);
+
+    // Delete metrics that have been longer than 
'maximumPerWorkerCounterStaleness'.
+    Set<MetricName> deletedMetricNames = new HashSet<MetricName>();
+    for (Entry<MetricName, Instant> entry : 
perWorkerCountersByFirstStaleTime.entrySet()) {
+      if (Duration.between(entry.getValue(), extractionTime)
+              .compareTo(maximumPerWorkerCounterStaleness)
+          > 0) {
+        RemoveSafeDeltaCounterCell cell =
+            new RemoveSafeDeltaCounterCell(entry.getKey(), perWorkerCounters);
+        cell.deleteIfZero();
+        deletedMetricNames.add(entry.getKey());
+      }
+    }
+
+    // Insert new zero-valued metrics into `perWorkerCountersByFirstStaleTime`.
+    currentZeroValuedCounters.forEach(
+        name -> perWorkerCountersByFirstStaleTime.putIfAbsent(name, 
extractionTime));
+
+    // Metrics in 'deletedMetricNames' have either been removed from 
'perWorkerCounters' or are no
+    // longer zero-valued.
+    perWorkerCountersByFirstStaleTime.keySet().removeAll(deletedMetricNames);
+  }
+
+  /**
+   * Extracts metric updates for all PerWorker metrics that have changed in 
this Container since the
+   * last time this function was called. Additionally, deletes any PerWorker 
counters that have been
+   * zero valued for more than {@code maximumPerWorkerCounterStaleness}.
+   */
+  private Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
+    ConcurrentHashMap<MetricName, Long> counters = new 
ConcurrentHashMap<MetricName, Long>();
+    ConcurrentHashMap<MetricName, HistogramData> histograms =
+        new ConcurrentHashMap<MetricName, HistogramData>();
+    HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
+
+    // Extract metrics updates.
+    perWorkerCounters.forEach(
+        (k, v) -> {
+          Long val = v.getAndSet(0);
+          if (val == 0) {
+            currentZeroValuedCounters.add(k);
+            return;
+          }
+          counters.put(k, val);
+        });
+    perWorkerHistograms.forEach(
+        (k, v) -> {
+          HistogramData val = v.getCumulative().getAndReset();
+          if (val.getTotalCount() == 0) {
+            return;
+          }
+          histograms.put(k.getKey(), val);
+        });
+
+    deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
+
+    return MetricsToPerStepNamespaceMetricsConverter.convert(stepName, 
counters, histograms);
+  }
+
+  /**
+   * @param metricsContainerRegistry Metrics will be extracted for all 
containers in this registry.
+   * @return An iterable of {@link PerStepNamespaceMetrics} representing the 
changes to all
+   *     PerWorkerMetrics that have changed since the last time this function 
was invoked.
+   */
+  public static Iterable<PerStepNamespaceMetrics> 
extractPerWorkerMetricUpdates(
+      MetricsContainerRegistry<StreamingStepMetricsContainer> 
metricsContainerRegistry) {
+    return metricsContainerRegistry
+        .getContainers()
+        
.transformAndConcat(StreamingStepMetricsContainer::extractPerWorkerMetricUpdates);
+  }
+
+  // The following methods are used to test that zero-valued perWorkerMetrics 
metrics are properly
+  // deleted.
+  @VisibleForTesting
+  Map<MetricName, AtomicLong> getPerWorkerCounters() {
+    return perWorkerCounters;
+  }
+
+  @VisibleForTesting
+  Map<MetricName, Instant> getPerWorkerCountersByFirstStaleTime() {

Review Comment:
   done. Two points of note
   - Looks like there isn't a standard `TestClock` so I created a barebones 
class for in the unit test.
   - I also needed to mark the member function 
`StreamingStepMetricsContainer::extractPerWorkerMetrics` as 
`@VisibleForTesting`, otherwise I would also need to make modifications to the 
`MetricsContainerRegistry` which seems unnecessarily complex. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to