JayajP commented on code in PR #30089:
URL: https://github.com/apache/beam/pull/30089#discussion_r1465361704
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -178,4 +192,109 @@ public static Iterable<CounterUpdate>
extractMetricUpdates(
public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics)
{
StreamingStepMetricsContainer.enablePerWorkerMetrics =
enablePerWorkerMetrics;
}
+
+ /**
+ * Updates {@code perWorkerCountersByFirstStaleTime} with the current
zero-valued metrics and
+ * removes metrics that have been stale for longer than {@code
maximumPerWorkerCounterStaleness}
+ * from {@code perWorkerCounters}.
+ *
+ * @param currentZeroValuedCounters Current zero-valued perworker counters.
+ * @param extractionTime Time {@code currentZeroValuedCounters} were
discovered to be zero-valued.
+ */
+ private void deleteStaleCounters(
+ Set<MetricName> currentZeroValuedCounters, Instant extractionTime) {
+ // perWorkerCountersByFirstStaleTime should only contain metrics that are
currently zero-valued.
+
perWorkerCountersByFirstStaleTime.keySet().retainAll(currentZeroValuedCounters);
+
+ // Delete metrics that have been longer than
'maximumPerWorkerCounterStaleness'.
+ Set<MetricName> deletedMetricNames = new HashSet<MetricName>();
+ for (Entry<MetricName, Instant> entry :
perWorkerCountersByFirstStaleTime.entrySet()) {
+ if (Duration.between(entry.getValue(), extractionTime)
+ .compareTo(maximumPerWorkerCounterStaleness)
+ > 0) {
+ RemoveSafeDeltaCounterCell cell =
+ new RemoveSafeDeltaCounterCell(entry.getKey(), perWorkerCounters);
+ cell.deleteIfZero();
+ deletedMetricNames.add(entry.getKey());
+ }
+ }
+
+ // Insert new zero-valued metrics into `perWorkerCountersByFirstStaleTime`.
+ currentZeroValuedCounters.forEach(
+ name -> perWorkerCountersByFirstStaleTime.putIfAbsent(name,
extractionTime));
+
+ // Metrics in 'deletedMetricNames' have either been removed from
'perWorkerCounters' or are no
+ // longer zero-valued.
+ perWorkerCountersByFirstStaleTime.keySet().removeAll(deletedMetricNames);
+ }
+
+ /**
+ * Extracts metric updates for all PerWorker metrics that have changed in
this Container since the
+ * last time this function was called. Additionally, deletes any PerWorker
counters that have been
+ * zero valued for more than {@code maximumPerWorkerCounterStaleness}.
+ */
+ private Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
+ ConcurrentHashMap<MetricName, Long> counters = new
ConcurrentHashMap<MetricName, Long>();
+ ConcurrentHashMap<MetricName, HistogramData> histograms =
+ new ConcurrentHashMap<MetricName, HistogramData>();
+ HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
+
+ // Extract metrics updates.
+ perWorkerCounters.forEach(
+ (k, v) -> {
+ Long val = v.getAndSet(0);
+ if (val == 0) {
+ currentZeroValuedCounters.add(k);
+ return;
+ }
+ counters.put(k, val);
+ });
+ perWorkerHistograms.forEach(
+ (k, v) -> {
+ HistogramData val = v.getCumulative().getAndReset();
+ if (val.getTotalCount() == 0) {
+ return;
+ }
+ histograms.put(k.getKey(), val);
+ });
+
+ deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
+
+ return MetricsToPerStepNamespaceMetricsConverter.convert(stepName,
counters, histograms);
+ }
+
+ /**
+ * @param metricsContainerRegistry Metrics will be extracted for all
containers in this registry.
+ * @return An iterable of {@link PerStepNamespaceMetrics} representing the
changes to all
+ * PerWorkerMetrics that have changed since the last time this function
was invoked.
+ */
+ public static Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates(
+ MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry) {
+ return metricsContainerRegistry
+ .getContainers()
+
.transformAndConcat(StreamingStepMetricsContainer::extractPerWorkerMetricUpdates);
+ }
+
+ // The following methods are used to test that zero-valued perWorkerMetrics
metrics are properly
+ // deleted.
+ @VisibleForTesting
+ Map<MetricName, AtomicLong> getPerWorkerCounters() {
+ return perWorkerCounters;
+ }
+
+ @VisibleForTesting
+ Map<MetricName, Instant> getPerWorkerCountersByFirstStaleTime() {
Review Comment:
done. Two points of note
- Looks like there isn't a standard `TestClock` so I created a barebones
class for in the unit test.
- I also needed to mark the member function
`StreamingStepMetricsContainer::extractPerWorkerMetrics` as
`@VisibleForTesting`, otherwise I would also need to make modifications to the
`MetricsContainerRegistry` which seems unnecessarily complex.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]