scwhittle commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2025120871
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,28 +64,42 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + + // Must be shorter than withLongDeadline() duration in GrpcWindmillStreamFactory to prevent + // DEADLINE_EXCEEDED. + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; - private final ExecutorService streamStarter; + private final ExecutorService streamManagerExecutor; + private final String backendWorkerToken; + private final Object getWorkStreamLock = new Object(); + + @GuardedBy("getWorkStreamLock") + private final Supplier<GetWorkStream> getWorkStreamFactory; + + @GuardedBy("getWorkStreamLock") + private @Nullable GetWorkStream activeGetWorkStream; + + @GuardedBy("getWorkStreamLock") + private GetWorkBudget getWorkBudget; private WindmillStreamSender( WindmillConnection connection, GetWorkRequest getWorkRequest, - AtomicReference<GetWorkBudget> getWorkBudget, + GetWorkBudget initialGetWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemScheduler workItemScheduler, Function<GetDataStream, GetDataClient> getDataClientFactory, Function<CommitWorkStream, WorkCommitter> workCommitterFactory) { - this.started = new AtomicBoolean(false); - this.getWorkBudget = getWorkBudget; + this.backendWorkerToken = connection.backendWorkerToken(); + this.getWorkBudget = initialGetWorkBudget; this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); - // Stream instances connect/reconnect internally, so we can reuse the same instance through the Review Comment: don't we have the same problem that the GetData and commitWorkStream will have deadline exceeds? I'm wondering if it would be better to either improve StreamPool or have a more reusable class like StreamPool for internally reconnecting an AbstractWindmillStream. It woudl also be nice if we didn't have to have an additional thread-per stream just waiting for it terminate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org