scwhittle commented on code in PR #30233:
URL: https://github.com/apache/beam/pull/30233#discussion_r1481353403


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -59,28 +60,66 @@ public class MetricTrackingWindmillServerStub {
   private final MemoryMonitor gcThrashingMonitor;
   private final boolean useStreamingRequests;
 
+  private final WindmillStreamPool<GetDataStream> getDataStreamPool;
+
+  // May be the same instance as getDataStreamPool based upon options.
+  private final WindmillStreamPool<GetDataStream> heartbeatStreamPool;
+
   @GuardedBy("this")
   private final List<ReadBatch> pendingReadBatches;
 
   @GuardedBy("this")
   private int activeReadThreads = 0;
 
-  private WindmillStreamPool<GetDataStream> streamPool;
+  @Internal
+  @AutoBuilder(ofClass = MetricTrackingWindmillServerStub.class)
+  public abstract static class Builder {
+
+    abstract Builder setServer(WindmillServerStub server);
+
+    abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);
+
+    abstract Builder setUseStreamingRequests(boolean useStreamingRequests);
 
-  public MetricTrackingWindmillServerStub(
-      WindmillServerStub server, MemoryMonitor gcThrashingMonitor, boolean 
useStreamingRequests) {
+    abstract Builder setUseSeparateHeartbeatStreams(boolean 
useSeparateHeartbeatStreams);
+
+    abstract Builder setNumGetDataStreams(int numGetDataStreams);
+
+    abstract MetricTrackingWindmillServerStub build();
+  }
+
+  public static Builder builder(WindmillServerStub server, MemoryMonitor 
gcThrashingMonitor) {
+    return new AutoBuilder_MetricTrackingWindmillServerStub_Builder()
+        .setServer(server)
+        .setGcThrashingMonitor(gcThrashingMonitor)
+        .setUseStreamingRequests(false)
+        .setUseSeparateHeartbeatStreams(false)
+        .setNumGetDataStreams(1);
+  }
+
+  MetricTrackingWindmillServerStub(
+      WindmillServerStub server,
+      MemoryMonitor gcThrashingMonitor,
+      boolean useStreamingRequests,
+      boolean useSeparateHeartbeatStreams,
+      int numGetDataStreams) {
     this.server = server;
     this.gcThrashingMonitor = gcThrashingMonitor;
-    // This is used as a queue but is expected to be less than 10 batches.
-    this.pendingReadBatches = new ArrayList<>();
     this.useStreamingRequests = useStreamingRequests;
-  }
-
-  public void start() {
     if (useStreamingRequests) {
-      streamPool =
-          WindmillStreamPool.create(NUM_STREAMS, STREAM_TIMEOUT, 
this.server::getDataStream);
+      getDataStreamPool =
+          WindmillStreamPool.create(numGetDataStreams, STREAM_TIMEOUT, 
this.server::getDataStream);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to