This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5034e40fa0f [Dataflow Streaming] Remove nullness suppression of 
StreamingDataflowWorker (#37797)
5034e40fa0f is described below

commit 5034e40fa0f1bd82f06ffa337a8f2db3d19d00aa
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 13:05:33 2026 +0000

    [Dataflow Streaming] Remove nullness suppression of StreamingDataflowWorker 
(#37797)
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 51 ++++++++++++++--------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index aad27b86986..98f596141ee 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.MapTask;
 import com.google.auto.value.AutoValue;
@@ -119,6 +120,8 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheSta
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.checkerframework.checker.initialization.qual.UnknownInitialization;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -129,9 +132,6 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Implements a Streaming Dataflow worker.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 @Internal
 public final class StreamingDataflowWorker {
 
@@ -189,7 +189,7 @@ public final class StreamingDataflowWorker {
   private final StreamingWorkerStatusReporter workerStatusReporter;
   private final int numCommitThreads;
   private final Supplier<Instant> clock;
-  private final GrpcDispatcherClient dispatcherClient;
+  private final @Nullable GrpcDispatcherClient dispatcherClient;
   private final ExecutorService harnessSwitchExecutor;
   private final long clientId;
   private final WindmillServerStub windmillServer;
@@ -271,7 +271,7 @@ public final class StreamingDataflowWorker {
                 streamingWorkScheduler,
                 getDataMetricTracker,
                 memoryMonitor,
-                this.dispatcherClient);
+                checkNotNull(this.dispatcherClient));
       } else {
         harnessFactoryOutput =
             createSingleSourceWorkerHarness(
@@ -330,6 +330,8 @@ public final class StreamingDataflowWorker {
   }
 
   private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
+      @UnderInitialization()
+          StreamingDataflowWorker this, // Use receiver parameter syntax to 
allow annotation.
       long clientId,
       DataflowWorkerHarnessOptions options,
       WindmillServerStub windmillServer,
@@ -345,6 +347,7 @@ public final class StreamingDataflowWorker {
 
     GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
     HeartbeatSender heartbeatSender = new 
ApplianceHeartbeatSender(windmillServer::getData);
+    @SuppressWarnings("methodref.receiver.bound")
     WorkCommitter workCommitter =
         StreamingApplianceWorkCommitter.create(windmillServer::commitWork, 
this::onCompleteCommit);
     GetWorkSender getWorkSender = GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request));
@@ -355,7 +358,7 @@ public final class StreamingDataflowWorker {
                 .setStreamingWorkScheduler(streamingWorkScheduler)
                 .setWorkCommitter(workCommitter)
                 .setGetDataClient(getDataClient)
-                .setComputationStateFetcher(this.computationStateCache::get)
+                
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
                 .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
                 .setHeartbeatSender(heartbeatSender)
                 .setGetWorkSender(getWorkSender)
@@ -368,6 +371,8 @@ public final class StreamingDataflowWorker {
   }
 
   private StreamingWorkerHarnessFactoryOutput 
createFanOutStreamingEngineWorkerHarness(
+      @UnknownInitialization()
+          StreamingDataflowWorker this, // Use receiver parameter syntax to 
allow annotation.
       long clientId,
       DataflowWorkerHarnessOptions options,
       GrpcWindmillStreamFactory windmillStreamFactory,
@@ -376,7 +381,8 @@ public final class StreamingDataflowWorker {
       MemoryMonitor memoryMonitor,
       GrpcDispatcherClient dispatcherClient) {
     WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
-    ChannelCache channelCache = createChannelCache(options, configFetcher);
+    ChannelCache channelCache = createChannelCache(options, 
checkNotNull(configFetcher));
+    @SuppressWarnings("methodref.receiver.bound")
     FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
         FanOutStreamingEngineWorkerHarness.create(
             createJobHeader(options, clientId),
@@ -391,7 +397,7 @@ public final class StreamingDataflowWorker {
                 processingContext,
                 drainMode,
                 getWorkStreamLatencies) ->
-                computationStateCache
+                checkNotNull(computationStateCache)
                     .get(processingContext.computationId())
                     .ifPresent(
                         computationState -> {
@@ -407,7 +413,7 @@ public final class StreamingDataflowWorker {
                         }),
             ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
             GetWorkBudgetDistributors.distributeEvenly(),
-            Preconditions.checkNotNull(dispatcherClient),
+            checkNotNull(dispatcherClient),
             commitWorkStream ->
                 StreamingEngineWorkCommitter.builder()
                     // Share the commitByteSemaphore across all created 
workCommitters.
@@ -433,6 +439,8 @@ public final class StreamingDataflowWorker {
   }
 
   private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
+      @UnknownInitialization()
+          StreamingDataflowWorker this, // Use receiver parameter syntax to 
allow annotation.
       long clientId,
       DataflowWorkerHarnessOptions options,
       WindmillServerStub windmillServer,
@@ -454,7 +462,11 @@ public final class StreamingDataflowWorker {
         new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
     HeartbeatSender heartbeatSender =
         createStreamingEngineHeartbeatSender(
-            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+            options,
+            windmillServer,
+            getDataStreamPool,
+            checkNotNull(configFetcher).getGlobalConfigHandle());
+    @SuppressWarnings("methodref.receiver.bound")
     WorkCommitter workCommitter =
         StreamingEngineWorkCommitter.builder()
             .setCommitWorkStreamFactory(
@@ -476,7 +488,7 @@ public final class StreamingDataflowWorker {
                 .setStreamingWorkScheduler(streamingWorkScheduler)
                 .setWorkCommitter(workCommitter)
                 .setGetDataClient(getDataClient)
-                .setComputationStateFetcher(this.computationStateCache::get)
+                
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
                 .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
                 .setHeartbeatSender(heartbeatSender)
                 .setGetWorkSender(getWorkSender)
@@ -489,17 +501,20 @@ public final class StreamingDataflowWorker {
   }
 
   private void switchStreamingWorkerHarness(ConnectivityType connectivityType) 
{
-    if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
+      return;
+    }
+    boolean directPath = connectivityType == 
ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH;
+    if ((directPath
             && this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)
-        || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
-            && streamingWorkerHarness.get() instanceof 
SingleSourceWorkerHarness)) {
+        || (!directPath && streamingWorkerHarness.get() instanceof 
SingleSourceWorkerHarness)) {
       return;
     }
     // Stop the current status pages before switching the harness.
     this.statusPages.get().stop();
     LOG.debug("Stopped StreamingWorkerStatusPages before switching 
connectivity type.");
-    StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
-    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+    StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput;
+    if (directPath) {
       // If dataflow experiment `enable_windmill_service_direct_path` is not 
set for
       // the job, do not switch to FanOutStreamingEngineWorkerHarness. This is 
because
       // `enable_windmill_service_direct_path` is tied to SDK version and is 
only
@@ -524,11 +539,11 @@ public final class StreamingDataflowWorker {
               this.streamingWorkScheduler,
               this.getDataMetricTracker,
               this.memoryMonitor.memoryMonitor(),
-              this.dispatcherClient);
+              checkNotNull(this.dispatcherClient));
       
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
       streamingWorkerHarness.get().start();
       LOG.debug("Started FanOutStreamingEngineWorkerHarness");
-    } else if (connectivityType == 
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+    } else {
       LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
       LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
       streamingWorkerHarness.get().shutdown();

Reply via email to