scwhittle commented on code in PR #32511:
URL: https://github.com/apache/beam/pull/32511#discussion_r1768222126
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java:
##########
@@ -27,19 +30,52 @@
/** StreamingEngine stream pool based implementation of {@link
HeartbeatSender}. */
@Internal
public final class StreamPoolHeartbeatSender implements HeartbeatSender {
+
private static final Logger LOG =
LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);
- private final WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool;
+ @Nonnull
+ private final
AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
+ heartbeatStreamPool = new AtomicReference<>();
- public StreamPoolHeartbeatSender(
+ private StreamPoolHeartbeatSender(
WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
- this.heartbeatStreamPool = heartbeatStreamPool;
+ this.heartbeatStreamPool.set(heartbeatStreamPool);
+ }
+
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool) {
+ return new StreamPoolHeartbeatSender(heartbeatStreamPool);
+ }
+
+ /**
+ * Creates StreamPoolHeartbeatSender that switches between the passed in
stream pools depending on
+ * global config.
+ *
+ * @param heartbeatStreamPool stream to use when using separate streams for
heartbeat is enabled.
+ * @param getDataPool stream to use when using separate streams for
heartbeat is disabled.
+ */
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool,
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
+ @Nonnull StreamingGlobalConfigHandle configHandle) {
+ // Use getDataPool as the default, settings callback will
+ // switch to the separate pool if enabled before processing any elements
are processed.
+ StreamPoolHeartbeatSender heartbeatSender = new
StreamPoolHeartbeatSender(heartbeatStreamPool);
+ configHandle.registerConfigObserver(
+ streamingGlobalConfig ->
+ heartbeatSender.heartbeatStreamPool.set(
+ streamingGlobalConfig
+ .userWorkerJobSettings()
+ .getUseSeparateWindmillHeartbeatStreams()
+ ? heartbeatStreamPool
+ : getDataPool));
+ return heartbeatSender;
}
@Override
public void sendHeartbeats(Heartbeats heartbeats) {
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
- heartbeatStreamPool.getCloseableStream()) {
+ heartbeatStreamPool.get().getCloseableStream()) {
Review Comment:
could store both pools and switch between them here if we want it to be
dynamic on not just worker startup.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -253,12 +255,29 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- heartbeatSender =
- new StreamPoolHeartbeatSender(
- options.getUseSeparateWindmillHeartbeatStreams()
- ? WindmillStreamPool.create(
- 1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream)
- : getDataStreamPool);
+ // Experiment gates the logic till backend changes are rollback safe
+ if (DataflowRunner.hasExperiment(
+ options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) {
Review Comment:
do we need the experiment? if the backend is rolled back shouldnt' it just
default to off? (if not should we change the default behavior?)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -253,12 +255,29 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- heartbeatSender =
- new StreamPoolHeartbeatSender(
- options.getUseSeparateWindmillHeartbeatStreams()
- ? WindmillStreamPool.create(
- 1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream)
- : getDataStreamPool);
+ // Experiment gates the logic till backend changes are rollback safe
+ if (DataflowRunner.hasExperiment(
+ options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) {
+ heartbeatSender =
+ // If the setting is explicitly passed in via PipelineOptions use
it,
+ // else rely on the global config
+ options.getUseSeparateWindmillHeartbeatStreams() != null
+ ? StreamPoolHeartbeatSender.Create(
+ options.getUseSeparateWindmillHeartbeatStreams()
+ ? separateHeartbeatPool(windmillServer)
+ : getDataStreamPool)
+ : StreamPoolHeartbeatSender.Create(
+ separateHeartbeatPool(windmillServer),
+ getDataStreamPool,
+ configFetcher.getGlobalConfigHandle());
+ } else {
+ heartbeatSender =
Review Comment:
think you can share some of the cases
```
if (!hasExperiment || options.getUseSeparateWindmillHeartbeatStreams() !=
null) {
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
? separateHeartbeatPool(windmillServer)
: getDataStreamPool);
} else {
StreamPoolHeartbeatSender.Create(
separateHeartbeatPool(windmillServer),
getDataStreamPool,
configFetcher.getGlobalConfigHandle());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]