m-trieu commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2035958287


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,28 +64,42 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+
+  // Must be shorter than withLongDeadline() duration in 
GrpcWindmillStreamFactory to prevent
+  // DEADLINE_EXCEEDED.
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
-  private final ExecutorService streamStarter;
+  private final ExecutorService streamManagerExecutor;
+  private final String backendWorkerToken;
+  private final Object getWorkStreamLock = new Object();
+
+  @GuardedBy("getWorkStreamLock")
+  private final Supplier<GetWorkStream> getWorkStreamFactory;
+
+  @GuardedBy("getWorkStreamLock")
+  private @Nullable GetWorkStream activeGetWorkStream;
+
+  @GuardedBy("getWorkStreamLock")
+  private GetWorkBudget getWorkBudget;
 
   private WindmillStreamSender(
       WindmillConnection connection,
       GetWorkRequest getWorkRequest,
-      AtomicReference<GetWorkBudget> getWorkBudget,
+      GetWorkBudget initialGetWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemScheduler workItemScheduler,
       Function<GetDataStream, GetDataClient> getDataClientFactory,
       Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
-    this.started = new AtomicBoolean(false);
-    this.getWorkBudget = getWorkBudget;
+    this.backendWorkerToken = connection.backendWorkerToken();
+    this.getWorkBudget = initialGetWorkBudget;
     this.streamingEngineThrottleTimers = 
StreamingEngineThrottleTimers.create();
-
     // Stream instances connect/reconnect internally, so we can reuse the same 
instance through the

Review Comment:
   from chat conversation
   
   Opted to use `ResettableThrowingStreamObserver` to close the streams.  
   - A background thread in `AbstractWindmillStream` calls 
`AbstractWindmillStream.restart()` based on the passed in stream TTL. 
   - `AbstractWindmillStream.restart()` calls 
`ResettableThrowingStreamObserver.release()` to async close the stream and 
`AbstractWindmillStream.startStream()` to reset the 
`ResettableThrowingStreamObserver` and call `onNewStream()` to send headers and 
flush any pending messages
   - `ResettableThrowingStreamObserver.release()` offloads the delegate to a 
queue which is polled by a background thread which calls 
`delegate.onError(InternalStreamTimeout())`
   - `AbstractWindmillStream.ResponseObserver.onError(...)` (which is the 
delegate) treats `InternalStreamTimeout` differently than other errors, 
skipping the call the 
`executor.execute(AbstractWindmillStream.this::startStream)`, since we already 
restarted the stream in `AbstractWindmillStream.restart()`
   
   now the `WindmillStreamSender` and callers can just call start() and 
shutdown() to manage the streams.
   
   I only added the directpath changes in this PR, the older implementations 
still have the pattern of `start()/awaitTermination()/halfClose()` in 
cloudpath.  The older code just submits a streamTTL w/ a negative time value 
which is skipped by `AbstractWindmillStream` when scheduling the background 
threads.
   
   also added a way to retain the reference to the restart task via the future. 
 I realized we don't want fixed intervals for stream restarts, we want restart 
time + the timeout, which changes every time we call startStream().  I think 
this should prevent a bunch of restarts from piling up and/or a restart to be 
called right after the stream has already restarted via other means.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to