scwhittle commented on code in PR #37847:
URL: https://github.com/apache/beam/pull/37847#discussion_r2929976378


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -59,12 +59,12 @@ public final class StreamingEngineWorkCommitter implements 
WorkCommitter {
   private final AtomicBoolean isRunning;
 
   StreamingEngineWorkCommitter(
-      Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory,
+      Supplier<Supplier<CloseableStream<CommitWorkStream>>> 
commitWorkStreamFactoryFactory,

Review Comment:
   it seems like this could remain as is since we are just calling get once
   
   if the caller wants to have a factoryfactory and get a new instance to pass 
in here each time, they can do so without changing this class.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -462,17 +454,14 @@ private StreamingWorkerHarnessFactoryOutput 
createSingleSourceWorkerHarness(
         new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
     HeartbeatSender heartbeatSender =
         createStreamingEngineHeartbeatSender(
-            options,
-            windmillServer,
-            getDataStreamPool,
-            checkNotNull(configFetcher).getGlobalConfigHandle());
-    @SuppressWarnings("methodref.receiver.bound")
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
     WorkCommitter workCommitter =
         StreamingEngineWorkCommitter.builder()
-            .setCommitWorkStreamFactory(
-                WindmillStreamPool.create(
-                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
-                    ::getCloseableStream)
+            .setCommitWorkStreamFactoryFactory(
+                () ->
+                    WindmillStreamPool.create(
+                            1, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                        ::getCloseableStream)

Review Comment:
   ie here do
   () -> new WindmillStreamPool.create(...).getCloseableStream()
   to use a new pool for each call.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to