scwhittle commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2027001535


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,28 +64,42 @@
 @Internal
 @ThreadSafe
 final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamSender.class);
+  private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = 
"WindmillStreamManagerThread";
+
+  // Must be shorter than withLongDeadline() duration in 
GrpcWindmillStreamFactory to prevent
+  // DEADLINE_EXCEEDED.
+  private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final GetDataStream getDataStream;
   private final CommitWorkStream commitWorkStream;
   private final WorkCommitter workCommitter;
   private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
-  private final ExecutorService streamStarter;
+  private final ExecutorService streamManagerExecutor;
+  private final String backendWorkerToken;
+  private final Object getWorkStreamLock = new Object();
+
+  @GuardedBy("getWorkStreamLock")
+  private final Supplier<GetWorkStream> getWorkStreamFactory;
+
+  @GuardedBy("getWorkStreamLock")
+  private @Nullable GetWorkStream activeGetWorkStream;
+
+  @GuardedBy("getWorkStreamLock")
+  private GetWorkBudget getWorkBudget;
 
   private WindmillStreamSender(
       WindmillConnection connection,
       GetWorkRequest getWorkRequest,
-      AtomicReference<GetWorkBudget> getWorkBudget,
+      GetWorkBudget initialGetWorkBudget,
       GrpcWindmillStreamFactory streamingEngineStreamFactory,
       WorkItemScheduler workItemScheduler,
       Function<GetDataStream, GetDataClient> getDataClientFactory,
       Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
-    this.started = new AtomicBoolean(false);
-    this.getWorkBudget = getWorkBudget;
+    this.backendWorkerToken = connection.backendWorkerToken();
+    this.getWorkBudget = initialGetWorkBudget;
     this.streamingEngineThrottleTimers = 
StreamingEngineThrottleTimers.create();
-
     // Stream instances connect/reconnect internally, so we can reuse the same 
instance through the

Review Comment:
   Did we discuss before adding this functionality directly to 
AbstractWindmillStream? It already has multiple physical grpc streams beneath 
it and knows how to restart things if needed. I think we'd have to make some of 
it's internal state per physical stream but the nice thing is that it would 
keep the clients simple as they just have a single logical stream.
   
   Currently we have AbstractWindmillStream.halfClose() which means we want to 
terminate the logical stream and then we halfclose the physical stream (and new 
ones that we may  create).  I think that if we added some timeout/recreation 
internal to the abstractstream we could halfclose on physical stream that we 
want to teardown but the logical stream is not half-closed. We'd have to 
maintain a set of the background streams that were half-closed but not yet 
terminated.
   
   I don't think the thread for management will cause CPU issues as noted since 
they are just WAITING, but since we can have a large # of windmill workers, I'd 
rather not just create a thread to block if not needed. We've had problems 
before where high # of threads use a lot of memory or perhaps slow down stack 
tracing etc. Some options would be some kind of scheduled future that we cancel 
if we observe a failure before then, or some maybeRestartStream method that a 
single thread at a higher level calls on all of the substreams like we do for 
maybeSendHeartbeats.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to