parveensania commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2114495165


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -17,152 +17,16 @@
  */
 package org.apache.beam.runners.dataflow.worker.streaming.harness;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import javax.annotation.concurrent.ThreadSafe;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
-import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
-import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
-import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.FixedStreamHeartbeatSender;
-import org.apache.beam.sdk.annotations.Internal;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Owns and maintains a set of streams used to communicate with a specific 
Windmill worker.
- *
- * <p>Once started, the underlying streams are "alive" until they are manually 
closed via {@link
- * #close()}.
- *
- * <p>If closed, it means that the backend endpoint is no longer in the worker 
set. Once closed,
- * these instances are not reused.
- *
- * @implNote Does not manage streams for fetching {@link
- *     org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData} 
for side inputs.
- */
-@Internal
-@ThreadSafe
-final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender 
{
-  private static final String STREAM_STARTER_THREAD_NAME = 
"StartWindmillStreamThread-%d";
-  private final AtomicBoolean started;
-  private final AtomicReference<GetWorkBudget> getWorkBudget;
-  private final GetWorkStream getWorkStream;
-  private final GetDataStream getDataStream;
-  private final CommitWorkStream commitWorkStream;
-  private final WorkCommitter workCommitter;
-  private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
-  private final ExecutorService streamStarter;
-
-  private WindmillStreamSender(
-      WindmillConnection connection,
-      GetWorkRequest getWorkRequest,
-      AtomicReference<GetWorkBudget> getWorkBudget,
-      GrpcWindmillStreamFactory streamingEngineStreamFactory,
-      WorkItemScheduler workItemScheduler,
-      Function<GetDataStream, GetDataClient> getDataClientFactory,
-      Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
-    this.started = new AtomicBoolean(false);
-    this.getWorkBudget = getWorkBudget;
-    this.streamingEngineThrottleTimers = 
StreamingEngineThrottleTimers.create();
 
-    // Stream instances connect/reconnect internally, so we can reuse the same 
instance through the
-    // entire lifecycle of WindmillStreamSender.
-    this.getDataStream =
-        streamingEngineStreamFactory.createDirectGetDataStream(
-            connection, streamingEngineThrottleTimers.getDataThrottleTimer());
-    this.commitWorkStream =
-        streamingEngineStreamFactory.createDirectCommitWorkStream(
-            connection, 
streamingEngineThrottleTimers.commitWorkThrottleTimer());
-    this.workCommitter = workCommitterFactory.apply(commitWorkStream);
-    this.getWorkStream =
-        streamingEngineStreamFactory.createDirectGetWorkStream(
-            connection,
-            withRequestBudget(getWorkRequest, getWorkBudget.get()),
-            streamingEngineThrottleTimers.getWorkThrottleTimer(),
-            FixedStreamHeartbeatSender.create(getDataStream),
-            getDataClientFactory.apply(getDataStream),
-            workCommitter,
-            workItemScheduler);
-    // 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
-    this.streamStarter =
-        Executors.newFixedThreadPool(
-            3, new 
ThreadFactoryBuilder().setNameFormat(STREAM_STARTER_THREAD_NAME).build());
-  }
-
-  static WindmillStreamSender create(
-      WindmillConnection connection,
-      GetWorkRequest getWorkRequest,
-      GetWorkBudget getWorkBudget,
-      GrpcWindmillStreamFactory streamingEngineStreamFactory,
-      WorkItemScheduler workItemScheduler,
-      Function<GetDataStream, GetDataClient> getDataClientFactory,
-      Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
-    return new WindmillStreamSender(
-        connection,
-        getWorkRequest,
-        new AtomicReference<>(getWorkBudget),
-        streamingEngineStreamFactory,
-        workItemScheduler,
-        getDataClientFactory,
-        workCommitterFactory);
-  }
-
-  private static GetWorkRequest withRequestBudget(GetWorkRequest request, 
GetWorkBudget budget) {
-    return 
request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build();
-  }
-
-  synchronized void start() {
-    if (!started.get()) {
-      checkState(!streamStarter.isShutdown(), "WindmillStreamSender has 
already been shutdown.");
-
-      // Start these 3 streams in parallel since they each may perform 
blocking IO.
-      CompletableFuture.allOf(
-              CompletableFuture.runAsync(getWorkStream::start, streamStarter),
-              CompletableFuture.runAsync(getDataStream::start, streamStarter),
-              CompletableFuture.runAsync(commitWorkStream::start, 
streamStarter))
-          .join();
-      workCommitter.start();
-      started.set(true);
-    }
-  }
-
-  @Override
-  public synchronized void close() {
-    streamStarter.shutdownNow();
-    getWorkStream.shutdown();
-    getDataStream.shutdown();
-    workCommitter.stop();
-    commitWorkStream.shutdown();
-  }
+/** Superclass for stream senders used to communicate with Windmill */
+public interface WindmillStreamSender extends GetWorkBudgetSpender {

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -296,6 +327,69 @@ public void 
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
     assertTrue(currentBackends.globalDataStreams().isEmpty());
   }
 
+  @Test
+  public void testOnNewWorkerMetadata_endpointTypeChanged() throws 
InterruptedException {
+    GetWorkBudgetDistributor getWorkBudgetDistributor = 
mock(GetWorkBudgetDistributor.class);
+    fanOutStreamingEngineWorkProvider =
+        newFanOutStreamingEngineWorkerHarness(
+            GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+            getWorkBudgetDistributor,
+            noOpProcessWorkItemFn());
+
+    String workerToken = "workerToken1";
+    String workerToken2 = "workerToken2";
+
+    WorkerMetadataResponse firstWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken)
+                    .build())
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken2)
+                    .build())
+            .setExternalEndpoint(AUTHENTICATING_SERVICE)
+            .setEndpointType(EndpointType.DIRECTPATH)
+            .putAllGlobalDataEndpoints(DEFAULT)
+            .build();
+
+    WorkerMetadataResponse secondWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(2)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setDirectEndpoint(
+                        
DEFAULT_WINDMILL_SERVICE_ADDRESS.gcpServiceAddress().toString())
+                    .build())
+            .setExternalEndpoint(AUTHENTICATING_SERVICE)
+            .setEndpointType(EndpointType.CLOUDPATH)
+            .build();
+
+    fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
+    StreamingEngineBackends currentBackends = 
fanOutStreamingEngineWorkProvider.currentBackends();
+    assertEquals(2, currentBackends.windmillStreams().size());
+    Set<String> workerTokens =
+        
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
+            .map(endpoint -> 
endpoint.workerToken().orElseThrow(IllegalStateException::new))
+            .collect(Collectors.toSet());
+    assertTrue(workerTokens.contains(workerToken));
+    assertTrue(workerTokens.contains(workerToken2));
+
+    fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata);
+    currentBackends = fanOutStreamingEngineWorkProvider.currentBackends();
+    assertEquals(1, currentBackends.windmillStreams().size());
+    Set<String> directEndpointStrings =
+        
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
+            .filter(endpoint -> endpoint.directEndpoint().isPresent())
+            .map(endpoint -> endpoint.directEndpoint().get())
+            .map(serviceAddress -> 
serviceAddress.getServiceAddress().toString())
+            .collect(Collectors.toSet());
+    assert (directEndpointStrings.contains(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to