arunpandianp commented on code in PR #32511:
URL: https://github.com/apache/beam/pull/32511#discussion_r1768316425


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java:
##########
@@ -27,19 +30,52 @@
 /** StreamingEngine stream pool based implementation of {@link 
HeartbeatSender}. */
 @Internal
 public final class StreamPoolHeartbeatSender implements HeartbeatSender {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);
 
-  private final WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool;
+  @Nonnull
+  private final 
AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
+      heartbeatStreamPool = new AtomicReference<>();
 
-  public StreamPoolHeartbeatSender(
+  private StreamPoolHeartbeatSender(
       WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
-    this.heartbeatStreamPool = heartbeatStreamPool;
+    this.heartbeatStreamPool.set(heartbeatStreamPool);
+  }
+
+  public static StreamPoolHeartbeatSender Create(
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool) {
+    return new StreamPoolHeartbeatSender(heartbeatStreamPool);
+  }
+
+  /**
+   * Creates StreamPoolHeartbeatSender that switches between the passed in 
stream pools depending on
+   * global config.
+   *
+   * @param heartbeatStreamPool stream to use when using separate streams for 
heartbeat is enabled.
+   * @param getDataPool stream to use when using separate streams for 
heartbeat is disabled.
+   */
+  public static StreamPoolHeartbeatSender Create(
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool,
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
+      @Nonnull StreamingGlobalConfigHandle configHandle) {
+    // Use getDataPool as the default, settings callback will
+    // switch to the separate pool if enabled before processing any elements 
are processed.
+    StreamPoolHeartbeatSender heartbeatSender = new 
StreamPoolHeartbeatSender(heartbeatStreamPool);
+    configHandle.registerConfigObserver(
+        streamingGlobalConfig ->
+            heartbeatSender.heartbeatStreamPool.set(
+                streamingGlobalConfig
+                        .userWorkerJobSettings()
+                        .getUseSeparateWindmillHeartbeatStreams()
+                    ? heartbeatStreamPool
+                    : getDataPool));
+    return heartbeatSender;
   }
 
   @Override
   public void sendHeartbeats(Heartbeats heartbeats) {
     try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
-        heartbeatStreamPool.getCloseableStream()) {
+        heartbeatStreamPool.get().getCloseableStream()) {

Review Comment:
   renamed variables to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to