arunpandianp commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1805562867
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import java.io.Closeable; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +final class GlobalDataStreamSender implements Closeable, Supplier<GetDataStream> { + private final Endpoint endpoint; + private final GetDataStream delegate; + private volatile boolean started; + + GlobalDataStreamSender(GetDataStream delegate, Endpoint endpoint) { + this.delegate = delegate; + this.started = false; + this.endpoint = endpoint; + } + + @Override + public GetDataStream get() { + if (!started) { + startStream(); Review Comment: can you add a comment on why we are lazily starting GlobalDataStreams? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -114,50 +120,35 @@ private FanOutStreamingEngineWorkerHarness( ChannelCachingStubFactory channelCachingStubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - long clientId, Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory, ThrottlingGetDataMetricTracker getDataMetricTracker) { this.jobHeader = jobHeader; this.getDataMetricTracker = getDataMetricTracker; this.started = false; this.streamFactory = streamFactory; this.workItemScheduler = workItemScheduler; - this.connections = new AtomicReference<>(StreamingEngineConnectionState.EMPTY); + this.backends = new AtomicReference<>(StreamingEngineBackends.EMPTY); this.channelCachingStubFactory = channelCachingStubFactory; this.dispatcherClient = dispatcherClient; - this.isBudgetRefreshPaused = new AtomicBoolean(false); this.getWorkerMetadataThrottleTimer = new ThrottleTimer(); - this.newWorkerMetadataPublisher = - singleThreadedExecutorServiceOf(PUBLISH_NEW_WORKER_METADATA_THREAD); - this.newWorkerMetadataConsumer = - singleThreadedExecutorServiceOf(CONSUME_NEW_WORKER_METADATA_THREAD); - this.clientId = clientId; - this.lastBudgetRefresh = new AtomicReference<>(Instant.EPOCH); - this.newWindmillEndpoints = Queues.synchronizedQueue(EvictingQueue.create(1)); - this.getWorkBudgetRefresher = - new GetWorkBudgetRefresher( - isBudgetRefreshPaused::get, - () -> { - getWorkBudgetDistributor.distributeBudget( - connections.get().windmillStreams().values(), totalGetWorkBudget); - lastBudgetRefresh.set(Instant.now()); - }); - this.getWorkerMetadataStream = - Suppliers.memoize( - () -> - streamFactory.createGetWorkerMetadataStream( - dispatcherClient.getWindmillMetadataServiceStubBlocking(), - getWorkerMetadataThrottleTimer, - endpoints -> - // Run this on a separate thread than the grpc stream thread. - newWorkerMetadataPublisher.submit( - () -> newWindmillEndpoints.add(endpoints)))); + this.windmillStreamManager = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build()); + this.workerMetadataConsumer = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()); + this.getWorkBudgetDistributor = getWorkBudgetDistributor; + this.totalGetWorkBudget = totalGetWorkBudget; + this.activeMetadataVersion = Long.MIN_VALUE; this.workCommitterFactory = workCommitterFactory; - } - - private static ExecutorService singleThreadedExecutorServiceOf(String threadName) { - return Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()); + // To satisfy CheckerFramework complaining about reference to "this" in constructor. + @SuppressWarnings("methodref.receiver.bound") + Consumer<WindmillEndpoints> newEndpointsConsumer = this::consumeWorkerMetadata; + this.getWorkerMetadataStream = + streamFactory.createGetWorkerMetadataStream( + dispatcherClient::getWindmillMetadataServiceStubBlocking, + getWorkerMetadataThrottleTimer, + newEndpointsConsumer); Review Comment: Move the construction to start() and remove the SupressWarning? One reason to not let `this` escape in constructor is the final fields are guaranteed to be final and visible to other classes after construction is complete. If `createGetWorkerMetadataStream` here calls `newEndpointsConsumer` inline there are no guarantees on what it'll get. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -317,29 +350,71 @@ public boolean commitWorkItem( if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { return false; } - PendingRequest request = new PendingRequest(computation, commitRequest, onDone); + + PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); add(idGenerator.incrementAndGet(), request); return true; } /** Flushes any pending work items to the wire. */ @Override public void flush() { - flushInternal(queue); - queuedBytes = 0; - queue.clear(); + try { + if (!isShutdown()) { + flushInternal(queue); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queuedBytes = 0; + queue.clear(); + } } void add(long id, PendingRequest request) { - assert (canAccept(request.getBytes())); + Preconditions.checkState(canAccept(request.getBytes())); Review Comment: Ideally CheckState's should not throw when things are working as expected. IllegalStateException means something unexpected happened. Can we call `request.onDone` with an error code when the stream is shutdown? If the stream is shutdown it means the work is no longer valid, right? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -63,32 +68,48 @@ */ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream { - public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; // Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce // per-chunk overhead, and small enough that we can still perform granular flow-control. protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20; - private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class); + private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); + protected final AtomicBoolean clientClosed; - private final AtomicBoolean isShutdown; + protected final Sleeper sleeper; private final AtomicLong lastSendTimeMs; - private final Executor executor; + private final ExecutorService executor; private final BackOff backoff; private final AtomicLong startTimeMs; private final AtomicLong lastResponseTimeMs; + private final AtomicInteger restartCount; private final AtomicInteger errorCount; - private final AtomicReference<String> lastError; - private final AtomicReference<DateTime> lastErrorTime; + private final AtomicReference<String> lastRestartReason; + private final AtomicReference<DateTime> lastRestartTime; private final AtomicLong sleepUntil; private final CountDownLatch finishLatch; private final Set<AbstractWindmillStream<?, ?>> streamRegistry; private final int logEveryNStreamFailures; - private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; - // Indicates if the current stream in requestObserver is closed by calling close() method - private final AtomicBoolean streamClosed; private final String backendWorkerToken; - private @Nullable StreamObserver<RequestT> requestObserver; + private final ResettableRequestObserver<RequestT> requestObserver; + + /** Guards {@link #start()} and {@link #shutdown()} methods. */ + private final Object shutdownLock = new Object(); + + /** Reads are lock free, writes are guarded by shutdownLock. */ + private final AtomicBoolean isShutdown; + + private final AtomicBoolean started; Review Comment: now that we have `shutdownLock`, these can be volatile booleans ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -317,29 +350,71 @@ public boolean commitWorkItem( if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { return false; } - PendingRequest request = new PendingRequest(computation, commitRequest, onDone); + + PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); add(idGenerator.incrementAndGet(), request); return true; } /** Flushes any pending work items to the wire. */ @Override public void flush() { - flushInternal(queue); - queuedBytes = 0; - queue.clear(); + try { + if (!isShutdown()) { + flushInternal(queue); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + queuedBytes = 0; + queue.clear(); + } } void add(long id, PendingRequest request) { - assert (canAccept(request.getBytes())); + Preconditions.checkState(canAccept(request.getBytes())); Review Comment: Or we could remove the shutDown checks in `add` and call the callback with error status in `flush`. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) { } private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) { - while (true) { + while (!isShutdown()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { - // Retry issuing the request since the response stream was cancelled. - continue; + } catch (AppendableInputStream.InvalidInputStreamStateException + | VerifyException + | CancellationException e) { + handleShutdown(request, e); + if (!(e instanceof CancellationException)) { + throw e; + } } catch (IOException e) { LOG.error("Parsing GetData response failed: ", e); - continue; } catch (InterruptedException e) { Thread.currentThread().interrupt(); + handleShutdown(request, e); throw new RuntimeException(e); } finally { pending.remove(request.id()); } } + + // If we have exited the loop here, the stream has been shutdown. Cancel the response stream. + request.getResponseStream().cancel(); Review Comment: If it is not needed, lets remove it. To catch anything dangling, we can add a check and throw ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -131,46 +124,43 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB return request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build(); } - @SuppressWarnings("ReturnValueIgnored") - void startStreams() { - getWorkStream.get(); - getDataStream.get(); - commitWorkStream.get(); - workCommitter.get().start(); - // *stream.get() is all memoized in a threadsafe manner. - started.set(true); - } - - void closeAllStreams() { - // Supplier<Stream>.get() starts the stream which is an expensive operation as it initiates the - // streaming RPCs by possibly making calls over the network. Do not close the streams unless - // they have already been started. - if (started.get()) { - getWorkStream.get().shutdown(); - getDataStream.get().shutdown(); - workCommitter.get().stop(); - commitWorkStream.get().shutdown(); + synchronized void start() { + if (!started.get()) { + // Start these 3 streams in parallel since they each may perform blocking IO. + CompletableFuture.allOf( + CompletableFuture.runAsync(getWorkStream::start, streamStarter), + CompletableFuture.runAsync(getDataStream::start, streamStarter), + CompletableFuture.runAsync(commitWorkStream::start, streamStarter)) + .join(); + workCommitter.start(); + started.set(true); } } @Override - public void adjustBudget(long itemsDelta, long bytesDelta) { - getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta)); + public synchronized void close() { if (started.get()) { - getWorkStream.get().adjustBudget(itemsDelta, bytesDelta); + getWorkStream.shutdown(); + getDataStream.shutdown(); + workCommitter.stop(); + commitWorkStream.shutdown(); } } @Override - public GetWorkBudget remainingBudget() { - return started.get() ? getWorkStream.get().remainingBudget() : getWorkBudget.get(); + public void setBudget(long items, long bytes) { + GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); Review Comment: ```suggestion GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import java.io.Closeable; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +final class GlobalDataStreamSender implements Closeable, Supplier<GetDataStream> { Review Comment: nit: I would put the close method on a separate `StreamSender` interface, makes it easy to find references compared to the generic Closable interface. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -204,14 +224,14 @@ private void flushInternal(Map<Long, PendingRequest> requests) { } } - private void issueSingleRequest(final long id, PendingRequest pendingRequest) { + private void issueSingleRequest(long id, PendingRequest pendingRequest) { StreamingCommitWorkRequest.Builder requestBuilder = StreamingCommitWorkRequest.newBuilder(); requestBuilder .addCommitChunkBuilder() - .setComputationId(pendingRequest.computation) + .setComputationId(pendingRequest.computationId()) .setRequestId(id) - .setShardingKey(pendingRequest.request.getShardingKey()) - .setSerializedWorkItemCommit(pendingRequest.request.toByteString()); + .setShardingKey(pendingRequest.shardingKey()) + .setSerializedWorkItemCommit(pendingRequest.serializedCommit()); StreamingCommitWorkRequest chunk = requestBuilder.build(); synchronized (this) { pending.put(id, pendingRequest); Review Comment: Can this this `pending.put` execute after call to shutDown? What happens to the pending request if it does? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -273,10 +288,24 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp @Override public void sendHealthCheck() { if (hasPendingRequests()) { - send(StreamingGetDataRequest.newBuilder().build()); + send(HEALTH_CHECK_REQUEST); } } + @Override + protected void shutdownInternal() { + // Stream has been explicitly closed. Drain pending input streams and request batches. + // Future calls to send RPCs will fail. + pending.values().forEach(AppendableInputStream::cancel); Review Comment: ``` Preconditions.checkState(!isShutdown(), "Cannot send on shutdown stream."); verify(pending.put(request.id(), request.getResponseStream()) == null); ``` Thread1 is past the isShutDown check and waiting before `pending.put` Thread2 completes shitdownInternal Thread1 will now do `pending.put()`. One way to avoid this race is to put both paths under same mutex. We'll have to reevaluate why shutdown cannot be guarded by `this` or a different common mutex. We probably should not block for IO when holding the `this`mutex. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import java.io.Closeable; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +final class GlobalDataStreamSender implements Closeable, Supplier<GetDataStream> { Review Comment: remove `Supplier<GetDataStream>`? do we need it? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -342,29 +391,38 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept batch.addRequest(request); } if (responsibleForSend) { - if (waitForSendLatch == null) { + if (prevBatch == null) { // If there was not a previous batch wait a little while to improve // batching. - Thread.sleep(1); + sleeper.sleep(1); } else { - waitForSendLatch.await(); + prevBatch.waitForSendOrFailNotification(); } // Finalize the batch so that no additional requests will be added. Leave the batch in the // queue so that a subsequent batch will wait for its completion. synchronized (batches) { verify(batch == batches.peekFirst()); batch.markFinalized(); } + trySendBatch(batch); + } else { + // Wait for this batch to be sent before parsing the response. + batch.waitForSendOrFailNotification(); + } + } + + void trySendBatch(QueuedBatch batch) { + try { sendBatch(batch.requests()); synchronized (batches) { verify(batch == batches.pollFirst()); } // Notify all waiters with requests in this batch as well as the sender // of the next batch (if one exists). - batch.countDown(); - } else { - // Wait for this batch to be sent before parsing the response. - batch.await(); + batch.notifySent(); + } catch (Exception e) { + LOG.error("Error occurred sending batch.", e); + batch.notifyFailed(); Review Comment: We've to throw after notifyFailed for the sending thread to know about the failure? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -160,80 +179,125 @@ protected boolean isShutdown() { private StreamObserver<RequestT> requestObserver() { if (requestObserver == null) { throw new NullPointerException( - "requestObserver cannot be null. Missing a call to startStream() to initialize."); + "requestObserver cannot be null. Missing a call to start() to initialize stream."); } return requestObserver; } /** Send a request to the server. */ protected final void send(RequestT request) { - lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (isShutdown()) { + return; + } + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } - requestObserver().onNext(request); + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); + return; + } + + requestObserver.onError(e); + } + } + } + + @Override + public final void start() { + if (!isShutdown.get() && started.compareAndSet(false, true)) { + // start() should only be executed once during the lifetime of the stream for idempotency and + // when shutdown() has not been called. + startStream(); } } /** Starts the underlying stream. */ - protected final void startStream() { + private void startStream() { // Add the stream to the registry after it has been fully constructed. streamRegistry.add(this); while (true) { try { synchronized (this) { + if (isShutdown.get()) { + break; + } startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); streamClosed.set(false); - // lazily initialize the requestObserver. Gets reset whenever the stream is reopened. - requestObserver = requestObserverSupplier.get(); + requestObserver.reset(); onNewStream(); if (clientClosed.get()) { halfClose(); } return; Review Comment: since we return here on the happy path adding the stream here would add it only once ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java: ########## @@ -74,69 +76,104 @@ public void onNext(T value) { while (true) { try { synchronized (lock) { + // If we awaited previously and timed out, wait for the same phase. Otherwise we're + // careful to observe the phase before observing isReady. + if (awaitPhase < 0) { + awaitPhase = isReadyNotifier.getPhase(); + // If getPhase() returns a value less than 0, the phaser has been terminated. + if (awaitPhase < 0) { + return; + } + } + // We only check isReady periodically to effectively allow for increasing the outbound // buffer periodically. This reduces the overhead of blocking while still restricting // memory because there is a limited # of streams, and we have a max messages size of 2MB. if (++messagesSinceReady <= messagesBetweenIsReadyChecks) { outboundObserver.onNext(value); return; } - // If we awaited previously and timed out, wait for the same phase. Otherwise we're - // careful to observe the phase before observing isReady. - if (awaitPhase < 0) { - awaitPhase = phaser.getPhase(); - } + if (outboundObserver.isReady()) { messagesSinceReady = 0; outboundObserver.onNext(value); return; } } + // A callback has been registered to advance the phaser whenever the observer // transitions to is ready. Since we are waiting for a phase observed before the // outboundObserver.isReady() returned false, we expect it to advance after the // channel has become ready. This doesn't always seem to be the case (despite // documentation stating otherwise) so we poll periodically and enforce an overall // timeout related to the stream deadline. - phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + int nextPhase = + isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); + // If nextPhase is a value less than 0, the phaser has been terminated. + if (nextPhase < 0) { + return; Review Comment: > the stream has been shutdown and any requests should be dropped/ignored. Could you point me to how dropped commits get removed from the commit queue? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import java.io.Closeable; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +final class GlobalDataStreamSender implements Closeable, Supplier<GetDataStream> { + private final Endpoint endpoint; + private final GetDataStream delegate; + private volatile boolean started; + + GlobalDataStreamSender(GetDataStream delegate, Endpoint endpoint) { + this.delegate = delegate; + this.started = false; + this.endpoint = endpoint; + } + + @Override + public GetDataStream get() { + if (!started) { + startStream(); + } + + return delegate; + } + + private synchronized void startStream() { + // Check started again after we acquire the lock. + if (!started) { + started = true; + delegate.start(); + } + } + + @Override + public void close() { + delegate.shutdown(); Review Comment: do we need to check `started` here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -209,201 +198,201 @@ static FanOutStreamingEngineWorkerHarness forTesting( stubFactory, getWorkBudgetDistributor, dispatcherClient, - clientId, workCommitterFactory, getDataMetricTracker); fanOutStreamingEngineWorkProvider.start(); return fanOutStreamingEngineWorkProvider; } - @SuppressWarnings("ReturnValueIgnored") @Override public synchronized void start() { - Preconditions.checkState(!started, "StreamingEngineClient cannot start twice."); - // Starts the stream, this value is memoized. - getWorkerMetadataStream.get(); - startWorkerMetadataConsumer(); - getWorkBudgetRefresher.start(); + Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness cannot start twice."); + getWorkerMetadataStream.start(); started = true; } public ImmutableSet<HostAndPort> currentWindmillEndpoints() { - return connections.get().windmillConnections().keySet().stream() + return backends.get().windmillStreams().keySet().stream() .map(Endpoint::directEndpoint) .filter(Optional::isPresent) .map(Optional::get) - .filter( - windmillServiceAddress -> - windmillServiceAddress.getKind() != WindmillServiceAddress.Kind.IPV6) - .map( - windmillServiceAddress -> - windmillServiceAddress.getKind() == WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS - ? windmillServiceAddress.gcpServiceAddress() - : windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress()) + .map(WindmillServiceAddress::getServiceAddress) .collect(toImmutableSet()); } /** - * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or defaults to {@link - * GetDataStream} pointing to dispatcher. + * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link + * NoSuchElementException} if one is not found. */ private GetDataStream getGlobalDataStream(String globalDataKey) { - return Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey)) - .map(Supplier::get) - .orElseGet( - () -> - streamFactory.createGetDataStream( - dispatcherClient.getWindmillServiceStub(), new ThrottleTimer())); - } - - @SuppressWarnings("FutureReturnValueIgnored") - private void startWorkerMetadataConsumer() { - newWorkerMetadataConsumer.submit( - () -> { - while (true) { - Optional.ofNullable(newWindmillEndpoints.poll()) - .ifPresent(this::consumeWindmillWorkerEndpoints); - } - }); + return Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey)) + .map(GlobalDataStreamSender::get) + .orElseThrow( + () -> new NoSuchElementException("No endpoint for global data tag: " + globalDataKey)); } @VisibleForTesting @Override public synchronized void shutdown() { - Preconditions.checkState(started, "StreamingEngineClient never started."); - getWorkerMetadataStream.get().halfClose(); - getWorkBudgetRefresher.stop(); - newWorkerMetadataPublisher.shutdownNow(); - newWorkerMetadataConsumer.shutdownNow(); + Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness never started."); + Preconditions.checkNotNull(getWorkerMetadataStream).shutdown(); + workerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } - /** - * {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on - * new backend worker metadata. - */ + private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) { + synchronized (metadataLock) { + // Only process versions greater than what we currently have to prevent double processing of + // metadata. workerMetadataConsumer is single-threaded so we maintain ordering. + if (windmillEndpoints.version() > pendingMetadataVersion) { + pendingMetadataVersion = windmillEndpoints.version(); + workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints)); + } + } + } + private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) { - isBudgetRefreshPaused.set(true); - LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); - ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections = - createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); + // Since this is run on a single threaded executor, multiple versions of the metadata maybe + // queued up while a previous version of the windmillEndpoints were being consumed. Only consume + // the endpoints if they are the most current version. + synchronized (metadataLock) { + if (newWindmillEndpoints.version() < pendingMetadataVersion) { + return; + } + } - StreamingEngineConnectionState newConnectionsState = - StreamingEngineConnectionState.builder() - .setWindmillConnections(newWindmillConnections) - .setWindmillStreams( - closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values())) + LOG.debug( + "Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}", + newWindmillEndpoints, + activeMetadataVersion, + newWindmillEndpoints.version()); + closeStaleStreams(newWindmillEndpoints); + ImmutableMap<Endpoint, WindmillStreamSender> newStreams = + createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join(); + StreamingEngineBackends newBackends = + StreamingEngineBackends.builder() + .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); + backends.set(newBackends); + getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); + activeMetadataVersion = newWindmillEndpoints.version(); + } - LOG.info( - "Setting new connections: {}. Previous connections: {}.", - newConnectionsState, - connections.get()); - connections.set(newConnectionsState); - isBudgetRefreshPaused.set(false); - getWorkBudgetRefresher.requestBudgetRefresh(); + /** Close the streams that are no longer valid asynchronously. */ + private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) { + StreamingEngineBackends currentBackends = backends.get(); + ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams = + currentBackends.windmillStreams(); + currentWindmillStreams.entrySet().stream() + .filter( + connectionAndStream -> + !newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey())) + .forEach( + entry -> { + CompletableFuture<Void> unused = + CompletableFuture.runAsync( + () -> closeStreamSender(entry.getKey(), entry.getValue()), + windmillStreamManager); + }); + + Set<Endpoint> newGlobalDataEndpoints = + new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values()); + currentBackends.globalDataStreams().values().stream() + .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint())) + .forEach( + sender -> { + CompletableFuture<Void> unused = + CompletableFuture.runAsync( + () -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager); + }); + } + + private void closeStreamSender(Endpoint endpoint, Closeable sender) { + LOG.debug("Closing streams to endpoint={}, sender={}", endpoint, sender); + try { + sender.close(); + endpoint.directEndpoint().ifPresent(channelCachingStubFactory::remove); + LOG.debug("Successfully closed streams to {}", endpoint); + } catch (Exception e) { + LOG.error("Error closing streams to endpoint={}, sender={}", endpoint, sender); + } + } + + private synchronized CompletableFuture<ImmutableMap<Endpoint, WindmillStreamSender>> + createAndStartNewStreams(ImmutableSet<Endpoint> newWindmillEndpoints) { + ImmutableMap<Endpoint, WindmillStreamSender> currentStreams = backends.get().windmillStreams(); + return MoreFutures.allAsList( + newWindmillEndpoints.stream() + .map(endpoint -> getOrCreateWindmillStreamSenderFuture(endpoint, currentStreams)) + .collect(Collectors.toList())) + .thenApply( + backends -> backends.stream().collect(toImmutableMap(Pair::getLeft, Pair::getRight))) + .toCompletableFuture(); + } + + private CompletionStage<Pair<Endpoint, WindmillStreamSender>> + getOrCreateWindmillStreamSenderFuture( + Endpoint endpoint, ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) { + return MoreFutures.supplyAsync( Review Comment: can we return CompletableFuture.completedFuture(currentStreams.get(endpoint)) when currentStreams.get(endpoint) is not null without submitting it to the executor? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) { } private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) { - while (true) { + while (!isShutdown()) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); - } catch (CancellationException e) { - // Retry issuing the request since the response stream was cancelled. - continue; + } catch (AppendableInputStream.InvalidInputStreamStateException + | VerifyException Review Comment: Since VerifyException is not expected when things are working as expected, why not throw it upstream as it is? Converting it to WindmillStreamShutdownException will mask the real failure and the worker will be operating in an unknown state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org