m-trieu commented on code in PR #30089:
URL: https://github.com/apache/beam/pull/30089#discussion_r1464100234
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -178,4 +192,109 @@ public static Iterable<CounterUpdate>
extractMetricUpdates(
public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics)
{
StreamingStepMetricsContainer.enablePerWorkerMetrics =
enablePerWorkerMetrics;
}
+
+ /**
+ * Updates {@code perWorkerCountersByFirstStaleTime} with the current
zero-valued metrics and
+ * removes metrics that have been stale for longer than {@code
maximumPerWorkerCounterStaleness}
+ * from {@code perWorkerCounters}.
+ *
+ * @param currentZeroValuedCounters Current zero-valued perworker counters.
+ * @param extractionTime Time {@code currentZeroValuedCounters} were
discovered to be zero-valued.
+ */
+ private void deleteStaleCounters(
+ Set<MetricName> currentZeroValuedCounters, Instant extractionTime) {
+ // perWorkerCountersByFirstStaleTime should only contain metrics that are
currently zero-valued.
+
perWorkerCountersByFirstStaleTime.keySet().retainAll(currentZeroValuedCounters);
+
+ // Delete metrics that have been longer than
'maximumPerWorkerCounterStaleness'.
+ Set<MetricName> deletedMetricNames = new HashSet<MetricName>();
+ for (Entry<MetricName, Instant> entry :
perWorkerCountersByFirstStaleTime.entrySet()) {
+ if (Duration.between(entry.getValue(), extractionTime)
+ .compareTo(maximumPerWorkerCounterStaleness)
+ > 0) {
+ RemoveSafeDeltaCounterCell cell =
+ new RemoveSafeDeltaCounterCell(entry.getKey(), perWorkerCounters);
+ cell.deleteIfZero();
+ deletedMetricNames.add(entry.getKey());
+ }
+ }
+
+ // Insert new zero-valued metrics into `perWorkerCountersByFirstStaleTime`.
+ currentZeroValuedCounters.forEach(
+ name -> perWorkerCountersByFirstStaleTime.putIfAbsent(name,
extractionTime));
+
+ // Metrics in 'deletedMetricNames' have either been removed from
'perWorkerCounters' or are no
+ // longer zero-valued.
+ perWorkerCountersByFirstStaleTime.keySet().removeAll(deletedMetricNames);
+ }
+
+ /**
+ * Extracts metric updates for all PerWorker metrics that have changed in
this Container since the
+ * last time this function was called. Additionally, deletes any PerWorker
counters that have been
+ * zero valued for more than {@code maximumPerWorkerCounterStaleness}.
+ */
+ private Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
+ ConcurrentHashMap<MetricName, Long> counters = new
ConcurrentHashMap<MetricName, Long>();
+ ConcurrentHashMap<MetricName, HistogramData> histograms =
+ new ConcurrentHashMap<MetricName, HistogramData>();
+ HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
+
+ // Extract metrics updates.
+ perWorkerCounters.forEach(
+ (k, v) -> {
+ Long val = v.getAndSet(0);
+ if (val == 0) {
+ currentZeroValuedCounters.add(k);
+ return;
+ }
+ counters.put(k, val);
+ });
+ perWorkerHistograms.forEach(
+ (k, v) -> {
+ HistogramData val = v.getCumulative().getAndReset();
+ if (val.getTotalCount() == 0) {
+ return;
+ }
+ histograms.put(k.getKey(), val);
+ });
+
+ deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
+
+ return MetricsToPerStepNamespaceMetricsConverter.convert(stepName,
counters, histograms);
+ }
+
+ /**
+ * @param metricsContainerRegistry Metrics will be extracted for all
containers in this registry.
+ * @return An iterable of {@link PerStepNamespaceMetrics} representing the
changes to all
+ * PerWorkerMetrics that have changed since the last time this function
was invoked.
+ */
+ public static Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates(
+ MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry) {
+ return metricsContainerRegistry
+ .getContainers()
+
.transformAndConcat(StreamingStepMetricsContainer::extractPerWorkerMetricUpdates);
+ }
+
+ // The following methods are used to test that zero-valued perWorkerMetrics
metrics are properly
+ // deleted.
+ @VisibleForTesting
+ Map<MetricName, AtomicLong> getPerWorkerCounters() {
Review Comment:
Is it intended that these are able to mutated by callers? If not recommended
to use ImmutableMap
https://engdoc.corp.google.com/eng/doc/devguide/java/practices/choosing-collections.md?cl=head#immutable
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -68,6 +74,14 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);
+ private Map<MetricName, Instant> perWorkerCountersByFirstStaleTime = new
ConcurrentHashMap<>();
Review Comment:
make final unless it can be reassigned
`private final Map<MetricName, Instant> perWorkerCountersByFirstStaleTime =
new ConcurrentHashMap<>();`
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -178,4 +192,109 @@ public static Iterable<CounterUpdate>
extractMetricUpdates(
public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics)
{
StreamingStepMetricsContainer.enablePerWorkerMetrics =
enablePerWorkerMetrics;
}
+
+ /**
+ * Updates {@code perWorkerCountersByFirstStaleTime} with the current
zero-valued metrics and
+ * removes metrics that have been stale for longer than {@code
maximumPerWorkerCounterStaleness}
+ * from {@code perWorkerCounters}.
+ *
+ * @param currentZeroValuedCounters Current zero-valued perworker counters.
+ * @param extractionTime Time {@code currentZeroValuedCounters} were
discovered to be zero-valued.
+ */
+ private void deleteStaleCounters(
+ Set<MetricName> currentZeroValuedCounters, Instant extractionTime) {
+ // perWorkerCountersByFirstStaleTime should only contain metrics that are
currently zero-valued.
+
perWorkerCountersByFirstStaleTime.keySet().retainAll(currentZeroValuedCounters);
+
+ // Delete metrics that have been longer than
'maximumPerWorkerCounterStaleness'.
+ Set<MetricName> deletedMetricNames = new HashSet<MetricName>();
+ for (Entry<MetricName, Instant> entry :
perWorkerCountersByFirstStaleTime.entrySet()) {
+ if (Duration.between(entry.getValue(), extractionTime)
+ .compareTo(maximumPerWorkerCounterStaleness)
+ > 0) {
+ RemoveSafeDeltaCounterCell cell =
+ new RemoveSafeDeltaCounterCell(entry.getKey(), perWorkerCounters);
+ cell.deleteIfZero();
+ deletedMetricNames.add(entry.getKey());
+ }
+ }
+
+ // Insert new zero-valued metrics into `perWorkerCountersByFirstStaleTime`.
+ currentZeroValuedCounters.forEach(
+ name -> perWorkerCountersByFirstStaleTime.putIfAbsent(name,
extractionTime));
+
+ // Metrics in 'deletedMetricNames' have either been removed from
'perWorkerCounters' or are no
+ // longer zero-valued.
+ perWorkerCountersByFirstStaleTime.keySet().removeAll(deletedMetricNames);
+ }
+
+ /**
+ * Extracts metric updates for all PerWorker metrics that have changed in
this Container since the
+ * last time this function was called. Additionally, deletes any PerWorker
counters that have been
+ * zero valued for more than {@code maximumPerWorkerCounterStaleness}.
+ */
+ private Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
+ ConcurrentHashMap<MetricName, Long> counters = new
ConcurrentHashMap<MetricName, Long>();
+ ConcurrentHashMap<MetricName, HistogramData> histograms =
+ new ConcurrentHashMap<MetricName, HistogramData>();
+ HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
+
+ // Extract metrics updates.
+ perWorkerCounters.forEach(
+ (k, v) -> {
+ Long val = v.getAndSet(0);
+ if (val == 0) {
+ currentZeroValuedCounters.add(k);
+ return;
+ }
+ counters.put(k, val);
+ });
+ perWorkerHistograms.forEach(
+ (k, v) -> {
+ HistogramData val = v.getCumulative().getAndReset();
+ if (val.getTotalCount() == 0) {
+ return;
+ }
+ histograms.put(k.getKey(), val);
+ });
+
+ deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
+
+ return MetricsToPerStepNamespaceMetricsConverter.convert(stepName,
counters, histograms);
+ }
+
+ /**
+ * @param metricsContainerRegistry Metrics will be extracted for all
containers in this registry.
+ * @return An iterable of {@link PerStepNamespaceMetrics} representing the
changes to all
+ * PerWorkerMetrics that have changed since the last time this function
was invoked.
+ */
+ public static Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates(
+ MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry) {
+ return metricsContainerRegistry
+ .getContainers()
+
.transformAndConcat(StreamingStepMetricsContainer::extractPerWorkerMetricUpdates);
+ }
+
+ // The following methods are used to test that zero-valued perWorkerMetrics
metrics are properly
+ // deleted.
+ @VisibleForTesting
+ Map<MetricName, AtomicLong> getPerWorkerCounters() {
+ return perWorkerCounters;
+ }
+
+ @VisibleForTesting
+ Map<MetricName, Instant> getPerWorkerCountersByFirstStaleTime() {
Review Comment:
ditto to above
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -68,6 +74,14 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);
+ private Map<MetricName, Instant> perWorkerCountersByFirstStaleTime = new
ConcurrentHashMap<>();
+
+ // PerWorkerCounters that have been longer than this value will be removed
from the underlying
+ // metrics map.
+ private Duration maximumPerWorkerCounterStaleness = Duration.ofMinutes(5);
+
+ private Clock clock = Clock.systemUTC();
Review Comment:
see testing comment below, if you inject these then they can call be final
since they won't need to be reassigned.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -178,4 +192,109 @@ public static Iterable<CounterUpdate>
extractMetricUpdates(
public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics)
{
StreamingStepMetricsContainer.enablePerWorkerMetrics =
enablePerWorkerMetrics;
}
+
+ /**
+ * Updates {@code perWorkerCountersByFirstStaleTime} with the current
zero-valued metrics and
+ * removes metrics that have been stale for longer than {@code
maximumPerWorkerCounterStaleness}
+ * from {@code perWorkerCounters}.
+ *
+ * @param currentZeroValuedCounters Current zero-valued perworker counters.
+ * @param extractionTime Time {@code currentZeroValuedCounters} were
discovered to be zero-valued.
+ */
+ private void deleteStaleCounters(
+ Set<MetricName> currentZeroValuedCounters, Instant extractionTime) {
+ // perWorkerCountersByFirstStaleTime should only contain metrics that are
currently zero-valued.
+
perWorkerCountersByFirstStaleTime.keySet().retainAll(currentZeroValuedCounters);
+
+ // Delete metrics that have been longer than
'maximumPerWorkerCounterStaleness'.
+ Set<MetricName> deletedMetricNames = new HashSet<MetricName>();
+ for (Entry<MetricName, Instant> entry :
perWorkerCountersByFirstStaleTime.entrySet()) {
+ if (Duration.between(entry.getValue(), extractionTime)
+ .compareTo(maximumPerWorkerCounterStaleness)
+ > 0) {
+ RemoveSafeDeltaCounterCell cell =
+ new RemoveSafeDeltaCounterCell(entry.getKey(), perWorkerCounters);
+ cell.deleteIfZero();
+ deletedMetricNames.add(entry.getKey());
+ }
+ }
+
+ // Insert new zero-valued metrics into `perWorkerCountersByFirstStaleTime`.
+ currentZeroValuedCounters.forEach(
+ name -> perWorkerCountersByFirstStaleTime.putIfAbsent(name,
extractionTime));
+
+ // Metrics in 'deletedMetricNames' have either been removed from
'perWorkerCounters' or are no
+ // longer zero-valued.
+ perWorkerCountersByFirstStaleTime.keySet().removeAll(deletedMetricNames);
+ }
+
+ /**
+ * Extracts metric updates for all PerWorker metrics that have changed in
this Container since the
+ * last time this function was called. Additionally, deletes any PerWorker
counters that have been
+ * zero valued for more than {@code maximumPerWorkerCounterStaleness}.
+ */
+ private Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
+ ConcurrentHashMap<MetricName, Long> counters = new
ConcurrentHashMap<MetricName, Long>();
+ ConcurrentHashMap<MetricName, HistogramData> histograms =
+ new ConcurrentHashMap<MetricName, HistogramData>();
+ HashSet<MetricName> currentZeroValuedCounters = new HashSet<MetricName>();
+
+ // Extract metrics updates.
+ perWorkerCounters.forEach(
+ (k, v) -> {
+ Long val = v.getAndSet(0);
+ if (val == 0) {
+ currentZeroValuedCounters.add(k);
+ return;
+ }
+ counters.put(k, val);
+ });
+ perWorkerHistograms.forEach(
+ (k, v) -> {
+ HistogramData val = v.getCumulative().getAndReset();
+ if (val.getTotalCount() == 0) {
+ return;
+ }
+ histograms.put(k.getKey(), val);
+ });
+
+ deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
+
+ return MetricsToPerStepNamespaceMetricsConverter.convert(stepName,
counters, histograms);
+ }
+
+ /**
+ * @param metricsContainerRegistry Metrics will be extracted for all
containers in this registry.
+ * @return An iterable of {@link PerStepNamespaceMetrics} representing the
changes to all
+ * PerWorkerMetrics that have changed since the last time this function
was invoked.
+ */
+ public static Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates(
+ MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry) {
+ return metricsContainerRegistry
+ .getContainers()
+
.transformAndConcat(StreamingStepMetricsContainer::extractPerWorkerMetricUpdates);
+ }
+
+ // The following methods are used to test that zero-valued perWorkerMetrics
metrics are properly
+ // deleted.
+ @VisibleForTesting
+ Map<MetricName, AtomicLong> getPerWorkerCounters() {
+ return perWorkerCounters;
+ }
+
+ @VisibleForTesting
+ Map<MetricName, Instant> getPerWorkerCountersByFirstStaleTime() {
Review Comment:
If these are only visible for testing consider having a constructor or
factory method where you can inject these members (`perWorkerCounters`,
`perWorkerCountersByFirstStaleTime`, and `clock`)
and annotating that method as `@VisibleForTesting`
example:
```
@VisibleForTesting
static StreamingStepMetricsContainer forTesting(perWorkerCounters`,
perWorkerCountersByFirstStaleTime, clock, //...testing params) {
return new StreamingStepMetricsContainer (//....testing params );
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -50,10 +57,9 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingStepMetricsContainer implements MetricsContainer {
-
private final String stepName;
- private static Boolean enablePerWorkerMetrics;
+ private static Boolean enablePerWorkerMetrics = false;
Review Comment:
unless this is nullable, use the primitive type (`boolean`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]