m-trieu commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2035958287
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,28 +64,42 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + + // Must be shorter than withLongDeadline() duration in GrpcWindmillStreamFactory to prevent + // DEADLINE_EXCEEDED. + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; - private final ExecutorService streamStarter; + private final ExecutorService streamManagerExecutor; + private final String backendWorkerToken; + private final Object getWorkStreamLock = new Object(); + + @GuardedBy("getWorkStreamLock") + private final Supplier<GetWorkStream> getWorkStreamFactory; + + @GuardedBy("getWorkStreamLock") + private @Nullable GetWorkStream activeGetWorkStream; + + @GuardedBy("getWorkStreamLock") + private GetWorkBudget getWorkBudget; private WindmillStreamSender( WindmillConnection connection, GetWorkRequest getWorkRequest, - AtomicReference<GetWorkBudget> getWorkBudget, + GetWorkBudget initialGetWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemScheduler workItemScheduler, Function<GetDataStream, GetDataClient> getDataClientFactory, Function<CommitWorkStream, WorkCommitter> workCommitterFactory) { - this.started = new AtomicBoolean(false); - this.getWorkBudget = getWorkBudget; + this.backendWorkerToken = connection.backendWorkerToken(); + this.getWorkBudget = initialGetWorkBudget; this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); - // Stream instances connect/reconnect internally, so we can reuse the same instance through the Review Comment: from chat conversation Opted to use `ResettableThrowingStreamObserver` to close the streams. - A background thread in `AbstractWindmillStream` calls `AbstractWindmillStream.restart()` based on the passed in stream TTL. - `AbstractWindmillStream.restart()` calls `ResettableThrowingStreamObserver.release()` to async close the stream and `AbstractWindmillStream.startStream()` to reset the `ResettableThrowingStreamObserver` and call `onNewStream()` to send headers and flush any pending messages - `ResettableThrowingStreamObserver.release()` offloads the delegate to a queue which is polled by a background thread which calls `delegate.onError(InternalStreamTimeout())` - `AbstractWindmillStream.ResponseObserver.onError(...)` (which is the delegate) treats `InternalStreamTimeout` differently than other errors, skipping the call the `executor.execute(AbstractWindmillStream.this::startStream)`, since we already restarted the stream in `AbstractWindmillStream.restart()` now the `WindmillStreamSender` and callers can just call start() and shutdown() to manage the streams. I only added the directpath changes in this PR, the older implementations still have the pattern of `start()/awaitTermination()/halfClose()` in cloudpath. The older code just submits a streamTTL w/ a negative time value which is skipped by `AbstractWindmillStream` when scheduling the background threads. also added a way to retain the reference to the restart task via the future. I realized we don't want fixed intervals for stream restarts, we want restart time + the timeout, which changes every time we call startStream(). I think this should prevent a bunch of restarts from piling up and/or a restart to be called right after the stream has already restarted via other means. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org