m-trieu commented on code in PR #31784:
URL: https://github.com/apache/beam/pull/31784#discussion_r1696154122
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ThrottlingGetDataMetricTracker.java:
##########
@@ -26,121 +26,84 @@
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
/**
- * Wraps GetData calls that tracks metrics for the number of in-flight
requests and throttles
- * requests when memory pressure is high.
+ * Wraps GetData calls to track metrics for the number of in-flight requests
and throttles requests
+ * when memory pressure is high.
*/
@Internal
@ThreadSafe
public final class ThrottlingGetDataMetricTracker {
+ private static final String GET_STATE_DATA_RESOURCE_CONTEXT = "GetStateData";
+ private static final String GET_SIDE_INPUT_RESOURCE_CONTEXT =
"GetSideInputData";
+
private final MemoryMonitor gcThrashingMonitor;
- private final GetDataMetrics getDataMetrics;
+ private final AtomicInteger activeStateReads;
+ private final AtomicInteger activeSideInputs;
+ private final AtomicInteger activeHeartbeats;
public ThrottlingGetDataMetricTracker(MemoryMonitor gcThrashingMonitor) {
this.gcThrashingMonitor = gcThrashingMonitor;
- this.getDataMetrics = GetDataMetrics.create();
+ this.activeStateReads = new AtomicInteger();
+ this.activeSideInputs = new AtomicInteger();
+ this.activeHeartbeats = new AtomicInteger();
+ }
+
+ /**
+ * Tracks a state data fetch. If there is memory pressure, may throttle
requests. Returns an
+ * {@link AutoCloseable} that will decrement the metric after the call is
finished.
+ */
+ AutoCloseable trackStateDataFetchWithThrottling() {
+ gcThrashingMonitor.waitForResources(GET_STATE_DATA_RESOURCE_CONTEXT);
+ activeStateReads.getAndIncrement();
+ return activeStateReads::getAndDecrement;
}
/**
- * Tracks a GetData call. If there is memory pressure, may throttle
requests. Returns an {@link
- * AutoCloseable} that will decrement the metric after the call is finished.
+ * Tracks a side input fetch. If there is memory pressure, may throttle
requests. Returns an
+ * {@link AutoCloseable} that will decrement the metric after the call is
finished.
*/
- public AutoCloseable trackSingleCallWithThrottling(Type callType) {
- gcThrashingMonitor.waitForResources(callType.debugName);
- AtomicInteger getDataMetricTracker = getDataMetrics.getMetricFor(callType);
- getDataMetricTracker.getAndIncrement();
- return getDataMetricTracker::getAndDecrement;
+ AutoCloseable trackSideInputFetchWithThrottling() {
+ gcThrashingMonitor.waitForResources(GET_SIDE_INPUT_RESOURCE_CONTEXT);
+ activeSideInputs.getAndIncrement();
+ return activeSideInputs::getAndDecrement;
}
/**
* Tracks heartbeat request metrics. Returns an {@link AutoCloseable} that
will decrement the
* metric after the call is finished.
*/
public AutoCloseable trackHeartbeats(int numHeartbeats) {
- getDataMetrics
- .activeHeartbeats()
- .getAndUpdate(currentActiveHeartbeats -> currentActiveHeartbeats +
numHeartbeats);
- return () ->
- getDataMetrics.activeHeartbeats().getAndUpdate(existing -> existing -
numHeartbeats);
+ activeHeartbeats.getAndUpdate(
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]