scwhittle commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2016168525
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -88,19 +105,24 @@ private WindmillStreamSender( streamingEngineStreamFactory.createDirectCommitWorkStream( connection, streamingEngineThrottleTimers.commitWorkThrottleTimer()); this.workCommitter = workCommitterFactory.apply(commitWorkStream); - this.getWorkStream = - streamingEngineStreamFactory.createDirectGetWorkStream( - connection, - withRequestBudget(getWorkRequest, getWorkBudget.get()), - streamingEngineThrottleTimers.getWorkThrottleTimer(), - FixedStreamHeartbeatSender.create(getDataStream), - getDataClientFactory.apply(getDataStream), - workCommitter, - workItemScheduler); + this.activeGetWorkStream = new AtomicReference<>(); + this.getWorkStreamFactory = + () -> + streamingEngineStreamFactory.createDirectGetWorkStream( + connection, + withRequestBudget(getWorkRequest, getWorkBudget.get()), + streamingEngineThrottleTimers.getWorkThrottleTimer(), + FixedStreamHeartbeatSender.create(getDataStream), + getDataClientFactory.apply(getDataStream), + workCommitter, + workItemScheduler); // 3 threads, 1 for each stream type (GetWork, GetData, CommitWork). this.streamStarter = Executors.newFixedThreadPool( Review Comment: how about newCachedThreadPool? it seems like 2 of these threads are just for start() and then won't be used and we might as well have them go away. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -88,19 +105,24 @@ private WindmillStreamSender( streamingEngineStreamFactory.createDirectCommitWorkStream( connection, streamingEngineThrottleTimers.commitWorkThrottleTimer()); this.workCommitter = workCommitterFactory.apply(commitWorkStream); - this.getWorkStream = - streamingEngineStreamFactory.createDirectGetWorkStream( - connection, - withRequestBudget(getWorkRequest, getWorkBudget.get()), - streamingEngineThrottleTimers.getWorkThrottleTimer(), - FixedStreamHeartbeatSender.create(getDataStream), - getDataClientFactory.apply(getDataStream), - workCommitter, - workItemScheduler); + this.activeGetWorkStream = new AtomicReference<>(); + this.getWorkStreamFactory = + () -> + streamingEngineStreamFactory.createDirectGetWorkStream( + connection, + withRequestBudget(getWorkRequest, getWorkBudget.get()), + streamingEngineThrottleTimers.getWorkThrottleTimer(), Review Comment: should we just use the same throttle timer, heartbeat sender, and getdataclientfactory for each stream? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,15 +64,26 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; private final ExecutorService streamStarter; + private final String backendWorkerToken; + + @GuardedBy("activeGetWorkStream") + private final AtomicReference<GetWorkStream> activeGetWorkStream; + + @GuardedBy("activeGetWorkStream") + private final AtomicReference<GetWorkBudget> getWorkBudget; + + @GuardedBy("activeGetWorkStream") Review Comment: this doesn't seem like it needs to be guarded? any reason it has to be serially called? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -165,4 +201,39 @@ long getAndResetThrottleTime() { long getCurrentActiveCommitBytes() { return workCommitter.currentActiveCommitBytes(); } + + /** + * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline + * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors. + * If at any point the server closes the stream, reconnects immediately. + */ + private void getWorkStreamLoop(CountDownLatch waitForInitialStream) { + @Nullable GetWorkStream newStream = null; + while (isRunning.get()) { + synchronized (activeGetWorkStream) { + newStream = getWorkStreamFactory.get(); Review Comment: maybe this coudl be outside the synchronized block? we can create the new stream before swapping it in as the active one. We could create the new stream with no budget and just set it's budget once the old stream is half-closed. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -126,35 +148,49 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB } synchronized void start() { - if (!started.get()) { + if (isRunning.compareAndSet(false, true)) { checkState(!streamStarter.isShutdown(), "WindmillStreamSender has already been shutdown."); - // Start these 3 streams in parallel since they each may perform blocking IO. + CountDownLatch waitForInitialStream = new CountDownLatch(1); + streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream)); CompletableFuture.allOf( - CompletableFuture.runAsync(getWorkStream::start, streamStarter), CompletableFuture.runAsync(getDataStream::start, streamStarter), CompletableFuture.runAsync(commitWorkStream::start, streamStarter)) .join(); + try { + waitForInitialStream.await(); + } catch (InterruptedException e) { + close(); + LOG.error("GetWorkStream to {} was never able to start.", backendWorkerToken); + throw new IllegalStateException("GetWorkStream unable to start aborting.", e); + } workCommitter.start(); - started.set(true); } } @Override public synchronized void close() { + isRunning.set(false); streamStarter.shutdownNow(); - getWorkStream.shutdown(); getDataStream.shutdown(); workCommitter.stop(); commitWorkStream.shutdown(); } @Override public void setBudget(long items, long bytes) { - GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); - getWorkBudget.set(budget); - if (started.get()) { - getWorkStream.setBudget(budget); + synchronized (activeGetWorkStream) { + GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); Review Comment: this build and setting on the atomic, could be outside synchronized block ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,15 +64,26 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; private final ExecutorService streamStarter; + private final String backendWorkerToken; + + @GuardedBy("activeGetWorkStream") + private final AtomicReference<GetWorkStream> activeGetWorkStream; Review Comment: if this is guarded I don't think it needs to be an atomic. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -126,35 +148,49 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB } synchronized void start() { - if (!started.get()) { + if (isRunning.compareAndSet(false, true)) { checkState(!streamStarter.isShutdown(), "WindmillStreamSender has already been shutdown."); - // Start these 3 streams in parallel since they each may perform blocking IO. + CountDownLatch waitForInitialStream = new CountDownLatch(1); + streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream)); CompletableFuture.allOf( - CompletableFuture.runAsync(getWorkStream::start, streamStarter), CompletableFuture.runAsync(getDataStream::start, streamStarter), CompletableFuture.runAsync(commitWorkStream::start, streamStarter)) .join(); + try { + waitForInitialStream.await(); + } catch (InterruptedException e) { + close(); + LOG.error("GetWorkStream to {} was never able to start.", backendWorkerToken); + throw new IllegalStateException("GetWorkStream unable to start aborting.", e); + } workCommitter.start(); - started.set(true); } } @Override public synchronized void close() { + isRunning.set(false); streamStarter.shutdownNow(); - getWorkStream.shutdown(); getDataStream.shutdown(); workCommitter.stop(); commitWorkStream.shutdown(); } @Override public void setBudget(long items, long bytes) { - GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); - getWorkBudget.set(budget); - if (started.get()) { - getWorkStream.setBudget(budget); + synchronized (activeGetWorkStream) { + GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); + getWorkBudget.set(budget); + if (isRunning.get()) { Review Comment: remove running check and just use the null below? seems like if activeGetWorkStream is set it is ok to call and it's one less interleaving to think about ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,15 +64,26 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; + + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final GetDataStream getDataStream; private final CommitWorkStream commitWorkStream; private final WorkCommitter workCommitter; private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; private final ExecutorService streamStarter; + private final String backendWorkerToken; + + @GuardedBy("activeGetWorkStream") + private final AtomicReference<GetWorkStream> activeGetWorkStream; + + @GuardedBy("activeGetWorkStream") + private final AtomicReference<GetWorkBudget> getWorkBudget; Review Comment: ditto if this is guarded probably doesn't need to be atomic ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -165,4 +201,39 @@ long getAndResetThrottleTime() { long getCurrentActiveCommitBytes() { return workCommitter.currentActiveCommitBytes(); } + + /** + * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline + * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors. + * If at any point the server closes the stream, reconnects immediately. + */ + private void getWorkStreamLoop(CountDownLatch waitForInitialStream) { + @Nullable GetWorkStream newStream = null; + while (isRunning.get()) { + synchronized (activeGetWorkStream) { + newStream = getWorkStreamFactory.get(); + newStream.start(); + waitForInitialStream.countDown(); + activeGetWorkStream.set(newStream); + } + try { + // Try to gracefully terminate the stream. + if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, TimeUnit.MINUTES)) { + newStream.halfClose(); + } + + // If graceful termination is unsuccessful, forcefully shutdown. + if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) { + newStream.shutdown(); + } + + } catch (InterruptedException e) { + // continue until !isRunning. Review Comment: Could we instead force that isRunning is set to false here? we don't expect interruptions to happen for any other reason. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -57,15 +64,26 @@ @Internal @ThreadSafe final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; + private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); + private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; + private static final int GET_WORK_STREAM_TTL_MINUTES = 45; Review Comment: could note that this needs to be less than the deadline in the other file ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -165,4 +201,39 @@ long getAndResetThrottleTime() { long getCurrentActiveCommitBytes() { return workCommitter.currentActiveCommitBytes(); } + + /** + * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline + * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors. + * If at any point the server closes the stream, reconnects immediately. + */ + private void getWorkStreamLoop(CountDownLatch waitForInitialStream) { + @Nullable GetWorkStream newStream = null; + while (isRunning.get()) { + synchronized (activeGetWorkStream) { + newStream = getWorkStreamFactory.get(); + newStream.start(); + waitForInitialStream.countDown(); + activeGetWorkStream.set(newStream); + } + try { + // Try to gracefully terminate the stream. + if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, TimeUnit.MINUTES)) { + newStream.halfClose(); + } + + // If graceful termination is unsuccessful, forcefully shutdown. + if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) { Review Comment: maybe better to increase this? If we lose getwork responses then windmill worker has to retry, if we can get them to flush with a little more time that seems fine. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java: ########## @@ -189,6 +189,12 @@ private static <T extends AbstractStub<T>> T withDefaultDeadline(T stub) { return stub.withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); } + private static <T extends AbstractStub<T>> T withLongDeadline(T stub) { Review Comment: withDirectPathDeadline? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -165,4 +201,39 @@ long getAndResetThrottleTime() { long getCurrentActiveCommitBytes() { return workCommitter.currentActiveCommitBytes(); } + + /** + * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline + * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors. + * If at any point the server closes the stream, reconnects immediately. + */ + private void getWorkStreamLoop(CountDownLatch waitForInitialStream) { + @Nullable GetWorkStream newStream = null; + while (isRunning.get()) { + synchronized (activeGetWorkStream) { + newStream = getWorkStreamFactory.get(); + newStream.start(); + waitForInitialStream.countDown(); + activeGetWorkStream.set(newStream); + } + try { + // Try to gracefully terminate the stream. + if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, TimeUnit.MINUTES)) { + newStream.halfClose(); Review Comment: I think as we half-close here we probably want to create a new stream to take over. That way we aren't idle while we're waiting for the termination. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -217,11 +217,11 @@ public void terminate(Throwable terminationException) { } private String constructStreamCancelledErrorMessage(long totalSecondsWaited) { - return deadlineSeconds > 0 + return inactivityTimeout > 0 ? "Waited " + totalSecondsWaited + "s which exceeds given deadline of " - + deadlineSeconds + + inactivityTimeout + "s for the outboundObserver to become ready meaning " + "that the stream deadline was not respected." Review Comment: this seems like the wrong message if it isn't the stream deadline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org