arunpandianp commented on code in PR #32511:
URL: https://github.com/apache/beam/pull/32511#discussion_r1768316160
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java:
##########
@@ -27,19 +30,52 @@
/** StreamingEngine stream pool based implementation of {@link
HeartbeatSender}. */
@Internal
public final class StreamPoolHeartbeatSender implements HeartbeatSender {
+
private static final Logger LOG =
LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);
- private final WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool;
+ @Nonnull
+ private final
AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
+ heartbeatStreamPool = new AtomicReference<>();
- public StreamPoolHeartbeatSender(
+ private StreamPoolHeartbeatSender(
WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
- this.heartbeatStreamPool = heartbeatStreamPool;
+ this.heartbeatStreamPool.set(heartbeatStreamPool);
+ }
+
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool) {
+ return new StreamPoolHeartbeatSender(heartbeatStreamPool);
+ }
+
+ /**
+ * Creates StreamPoolHeartbeatSender that switches between the passed in
stream pools depending on
+ * global config.
+ *
+ * @param heartbeatStreamPool stream to use when using separate streams for
heartbeat is enabled.
+ * @param getDataPool stream to use when using separate streams for
heartbeat is disabled.
+ */
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool,
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
+ @Nonnull StreamingGlobalConfigHandle configHandle) {
+ // Use getDataPool as the default, settings callback will
+ // switch to the separate pool if enabled before processing any elements
are processed.
+ StreamPoolHeartbeatSender heartbeatSender = new
StreamPoolHeartbeatSender(heartbeatStreamPool);
+ configHandle.registerConfigObserver(
+ streamingGlobalConfig ->
+ heartbeatSender.heartbeatStreamPool.set(
+ streamingGlobalConfig
+ .userWorkerJobSettings()
+ .getUseSeparateWindmillHeartbeatStreams()
+ ? heartbeatStreamPool
+ : getDataPool));
+ return heartbeatSender;
}
@Override
public void sendHeartbeats(Heartbeats heartbeats) {
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
- heartbeatStreamPool.getCloseableStream()) {
+ heartbeatStreamPool.get().getCloseableStream()) {
Review Comment:
The settings callback has a reference to both the pools and it switches
between them whenever the setting changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]