m-trieu commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2025699431


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,28 +64,42 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+
+  // Must be shorter than withLongDeadline() duration in 
GrpcWindmillStreamFactory to prevent
+  // DEADLINE_EXCEEDED.
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
-  private final ExecutorService streamStarter;
+  private final ExecutorService streamManagerExecutor;
+  private final String backendWorkerToken;
+  private final Object getWorkStreamLock = new Object();
+
+  @GuardedBy("getWorkStreamLock")
+  private final Supplier<GetWorkStream> getWorkStreamFactory;
+
+  @GuardedBy("getWorkStreamLock")
+  private @Nullable GetWorkStream activeGetWorkStream;
+
+  @GuardedBy("getWorkStreamLock")
+  private GetWorkBudget getWorkBudget;
 
   private WindmillStreamSender(
       WindmillConnection connection,
       GetWorkRequest getWorkRequest,
-      AtomicReference<GetWorkBudget> getWorkBudget,
+      GetWorkBudget initialGetWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemScheduler workItemScheduler,
       Function<GetDataStream, GetDataClient> getDataClientFactory,
       Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
-    this.started = new AtomicBoolean(false);
-    this.getWorkBudget = getWorkBudget;
+    this.backendWorkerToken = connection.backendWorkerToken();
+    this.getWorkBudget = initialGetWorkBudget;
     this.streamingEngineThrottleTimers = 
StreamingEngineThrottleTimers.create();
-
     // Stream instances connect/reconnect internally, so we can reuse the same 
instance through the

Review Comment:
   how about adding an internal mechanism (maybe via another executor to make 
it async if we dont want the old stream closing to block the new stream 
creation/stream restart).
   
   Where we add this workflow:
   - create stream
   - start stream
   - wait for termination based on a TTL/timeout
   - loop while(stream.isRunning).
   
   we can add this in `AbstractWindmillStream` so callers are just exposed to 
`Stream.create()`, `start()`, `halfClose()/shutdown()` as an interface and not 
have to worry about the restart mechanism?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to