parveensania commented on code in PR #34148: URL: https://github.com/apache/beam/pull/34148#discussion_r2114495165
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java: ########## @@ -17,152 +17,16 @@ */ package org.apache.beam.runners.dataflow.worker.streaming.harness; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection; -import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; -import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; -import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; -import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender; -import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.FixedStreamHeartbeatSender; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Owns and maintains a set of streams used to communicate with a specific Windmill worker. - * - * <p>Once started, the underlying streams are "alive" until they are manually closed via {@link - * #close()}. - * - * <p>If closed, it means that the backend endpoint is no longer in the worker set. Once closed, - * these instances are not reused. - * - * @implNote Does not manage streams for fetching {@link - * org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData} for side inputs. - */ -@Internal -@ThreadSafe -final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender { - private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d"; - private final AtomicBoolean started; - private final AtomicReference<GetWorkBudget> getWorkBudget; - private final GetWorkStream getWorkStream; - private final GetDataStream getDataStream; - private final CommitWorkStream commitWorkStream; - private final WorkCommitter workCommitter; - private final StreamingEngineThrottleTimers streamingEngineThrottleTimers; - private final ExecutorService streamStarter; - - private WindmillStreamSender( - WindmillConnection connection, - GetWorkRequest getWorkRequest, - AtomicReference<GetWorkBudget> getWorkBudget, - GrpcWindmillStreamFactory streamingEngineStreamFactory, - WorkItemScheduler workItemScheduler, - Function<GetDataStream, GetDataClient> getDataClientFactory, - Function<CommitWorkStream, WorkCommitter> workCommitterFactory) { - this.started = new AtomicBoolean(false); - this.getWorkBudget = getWorkBudget; - this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); - // Stream instances connect/reconnect internally, so we can reuse the same instance through the - // entire lifecycle of WindmillStreamSender. - this.getDataStream = - streamingEngineStreamFactory.createDirectGetDataStream( - connection, streamingEngineThrottleTimers.getDataThrottleTimer()); - this.commitWorkStream = - streamingEngineStreamFactory.createDirectCommitWorkStream( - connection, streamingEngineThrottleTimers.commitWorkThrottleTimer()); - this.workCommitter = workCommitterFactory.apply(commitWorkStream); - this.getWorkStream = - streamingEngineStreamFactory.createDirectGetWorkStream( - connection, - withRequestBudget(getWorkRequest, getWorkBudget.get()), - streamingEngineThrottleTimers.getWorkThrottleTimer(), - FixedStreamHeartbeatSender.create(getDataStream), - getDataClientFactory.apply(getDataStream), - workCommitter, - workItemScheduler); - // 3 threads, 1 for each stream type (GetWork, GetData, CommitWork). - this.streamStarter = - Executors.newFixedThreadPool( - 3, new ThreadFactoryBuilder().setNameFormat(STREAM_STARTER_THREAD_NAME).build()); - } - - static WindmillStreamSender create( - WindmillConnection connection, - GetWorkRequest getWorkRequest, - GetWorkBudget getWorkBudget, - GrpcWindmillStreamFactory streamingEngineStreamFactory, - WorkItemScheduler workItemScheduler, - Function<GetDataStream, GetDataClient> getDataClientFactory, - Function<CommitWorkStream, WorkCommitter> workCommitterFactory) { - return new WindmillStreamSender( - connection, - getWorkRequest, - new AtomicReference<>(getWorkBudget), - streamingEngineStreamFactory, - workItemScheduler, - getDataClientFactory, - workCommitterFactory); - } - - private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkBudget budget) { - return request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build(); - } - - synchronized void start() { - if (!started.get()) { - checkState(!streamStarter.isShutdown(), "WindmillStreamSender has already been shutdown."); - - // Start these 3 streams in parallel since they each may perform blocking IO. - CompletableFuture.allOf( - CompletableFuture.runAsync(getWorkStream::start, streamStarter), - CompletableFuture.runAsync(getDataStream::start, streamStarter), - CompletableFuture.runAsync(commitWorkStream::start, streamStarter)) - .join(); - workCommitter.start(); - started.set(true); - } - } - - @Override - public synchronized void close() { - streamStarter.shutdownNow(); - getWorkStream.shutdown(); - getDataStream.shutdown(); - workCommitter.stop(); - commitWorkStream.shutdown(); - } +/** Superclass for stream senders used to communicate with Windmill */ +public interface WindmillStreamSender extends GetWorkBudgetSpender { Review Comment: Done ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java: ########## @@ -296,6 +327,69 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() assertTrue(currentBackends.globalDataStreams().isEmpty()); } + @Test + public void testOnNewWorkerMetadata_endpointTypeChanged() throws InterruptedException { + GetWorkBudgetDistributor getWorkBudgetDistributor = mock(GetWorkBudgetDistributor.class); + fanOutStreamingEngineWorkProvider = + newFanOutStreamingEngineWorkerHarness( + GetWorkBudget.builder().setItems(1).setBytes(1).build(), + getWorkBudgetDistributor, + noOpProcessWorkItemFn()); + + String workerToken = "workerToken1"; + String workerToken2 = "workerToken2"; + + WorkerMetadataResponse firstWorkerMetadata = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken(workerToken) + .build()) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setBackendWorkerToken(workerToken2) + .build()) + .setExternalEndpoint(AUTHENTICATING_SERVICE) + .setEndpointType(EndpointType.DIRECTPATH) + .putAllGlobalDataEndpoints(DEFAULT) + .build(); + + WorkerMetadataResponse secondWorkerMetadata = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(2) + .addWorkEndpoints( + WorkerMetadataResponse.Endpoint.newBuilder() + .setDirectEndpoint( + DEFAULT_WINDMILL_SERVICE_ADDRESS.gcpServiceAddress().toString()) + .build()) + .setExternalEndpoint(AUTHENTICATING_SERVICE) + .setEndpointType(EndpointType.CLOUDPATH) + .build(); + + fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata); + StreamingEngineBackends currentBackends = fanOutStreamingEngineWorkProvider.currentBackends(); + assertEquals(2, currentBackends.windmillStreams().size()); + Set<String> workerTokens = + fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream() + .map(endpoint -> endpoint.workerToken().orElseThrow(IllegalStateException::new)) + .collect(Collectors.toSet()); + assertTrue(workerTokens.contains(workerToken)); + assertTrue(workerTokens.contains(workerToken2)); + + fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata); + currentBackends = fanOutStreamingEngineWorkProvider.currentBackends(); + assertEquals(1, currentBackends.windmillStreams().size()); + Set<String> directEndpointStrings = + fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream() + .filter(endpoint -> endpoint.directEndpoint().isPresent()) + .map(endpoint -> endpoint.directEndpoint().get()) + .map(serviceAddress -> serviceAddress.getServiceAddress().toString()) + .collect(Collectors.toSet()); + assert (directEndpointStrings.contains( Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org