scwhittle commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2027001535
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,28 +64,42 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + + // Must be shorter than withLongDeadline() duration in GrpcWindmillStreamFactory to prevent + // DEADLINE_EXCEEDED. + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; - private final ExecutorService streamStarter; + private final ExecutorService streamManagerExecutor; + private final String backendWorkerToken; + private final Object getWorkStreamLock = new Object(); + + @GuardedBy("getWorkStreamLock") + private final Supplier<GetWorkStream> getWorkStreamFactory; + + @GuardedBy("getWorkStreamLock") + private @Nullable GetWorkStream activeGetWorkStream; + + @GuardedBy("getWorkStreamLock") + private GetWorkBudget getWorkBudget; private WindmillStreamSender( WindmillConnection connection, GetWorkRequest getWorkRequest, - AtomicReference<GetWorkBudget> getWorkBudget, + GetWorkBudget initialGetWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemScheduler workItemScheduler, Function<GetDataStream, GetDataClient> getDataClientFactory, Function<CommitWorkStream, WorkCommitter> workCommitterFactory) { - this.started = new AtomicBoolean(false); - this.getWorkBudget = getWorkBudget; + this.backendWorkerToken = connection.backendWorkerToken(); + this.getWorkBudget = initialGetWorkBudget; this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); - // Stream instances connect/reconnect internally, so we can reuse the same instance through the Review Comment: Did we discuss before adding this functionality directly to AbstractWindmillStream? It already has multiple physical grpc streams beneath it and knows how to restart things if needed. I think we'd have to make some of it's internal state per physical stream but the nice thing is that it would keep the clients simple as they just have a single logical stream. Currently we have AbstractWindmillStream.halfClose() which means we want to terminate the logical stream and then we halfclose the physical stream (and new ones that we may create). I think that if we added some timeout/recreation internal to the abstractstream we could halfclose on physical stream that we want to teardown but the logical stream is not half-closed. We'd have to maintain a set of the background streams that were half-closed but not yet terminated. I don't think the thread for management will cause CPU issues as noted since they are just WAITING, but since we can have a large # of windmill workers, I'd rather not just create a thread to block if not needed. We've had problems before where high # of threads use a lot of memory or perhaps slow down stack tracing etc. Some options would be some kind of scheduled future that we cancel if we observe a failure before then, or some maybeRestartStream method that a single thread at a higher level calls on all of the substreams like we do for maybeSendHeartbeats. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org