m-trieu commented on code in PR #31784:
URL: https://github.com/apache/beam/pull/31784#discussion_r1696154122


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ThrottlingGetDataMetricTracker.java:
##########
@@ -26,121 +26,84 @@
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 
 /**
- * Wraps GetData calls that tracks metrics for the number of in-flight 
requests and throttles
- * requests when memory pressure is high.
+ * Wraps GetData calls to track metrics for the number of in-flight requests 
and throttles requests
+ * when memory pressure is high.
  */
 @Internal
 @ThreadSafe
 public final class ThrottlingGetDataMetricTracker {
+  private static final String GET_STATE_DATA_RESOURCE_CONTEXT = "GetStateData";
+  private static final String GET_SIDE_INPUT_RESOURCE_CONTEXT = 
"GetSideInputData";
+
   private final MemoryMonitor gcThrashingMonitor;
-  private final GetDataMetrics getDataMetrics;
+  private final AtomicInteger activeStateReads;
+  private final AtomicInteger activeSideInputs;
+  private final AtomicInteger activeHeartbeats;
 
   public ThrottlingGetDataMetricTracker(MemoryMonitor gcThrashingMonitor) {
     this.gcThrashingMonitor = gcThrashingMonitor;
-    this.getDataMetrics = GetDataMetrics.create();
+    this.activeStateReads = new AtomicInteger();
+    this.activeSideInputs = new AtomicInteger();
+    this.activeHeartbeats = new AtomicInteger();
+  }
+
+  /**
+   * Tracks a state data fetch. If there is memory pressure, may throttle 
requests. Returns an
+   * {@link AutoCloseable} that will decrement the metric after the call is 
finished.
+   */
+  AutoCloseable trackStateDataFetchWithThrottling() {
+    gcThrashingMonitor.waitForResources(GET_STATE_DATA_RESOURCE_CONTEXT);
+    activeStateReads.getAndIncrement();
+    return activeStateReads::getAndDecrement;
   }
 
   /**
-   * Tracks a GetData call. If there is memory pressure, may throttle 
requests. Returns an {@link
-   * AutoCloseable} that will decrement the metric after the call is finished.
+   * Tracks a side input fetch. If there is memory pressure, may throttle 
requests. Returns an
+   * {@link AutoCloseable} that will decrement the metric after the call is 
finished.
    */
-  public AutoCloseable trackSingleCallWithThrottling(Type callType) {
-    gcThrashingMonitor.waitForResources(callType.debugName);
-    AtomicInteger getDataMetricTracker = getDataMetrics.getMetricFor(callType);
-    getDataMetricTracker.getAndIncrement();
-    return getDataMetricTracker::getAndDecrement;
+  AutoCloseable trackSideInputFetchWithThrottling() {
+    gcThrashingMonitor.waitForResources(GET_SIDE_INPUT_RESOURCE_CONTEXT);
+    activeSideInputs.getAndIncrement();
+    return activeSideInputs::getAndDecrement;
   }
 
   /**
    * Tracks heartbeat request metrics. Returns an {@link AutoCloseable} that 
will decrement the
    * metric after the call is finished.
    */
   public AutoCloseable trackHeartbeats(int numHeartbeats) {
-    getDataMetrics
-        .activeHeartbeats()
-        .getAndUpdate(currentActiveHeartbeats -> currentActiveHeartbeats + 
numHeartbeats);
-    return () ->
-        getDataMetrics.activeHeartbeats().getAndUpdate(existing -> existing - 
numHeartbeats);
+    activeHeartbeats.getAndUpdate(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to