scwhittle commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1484094625


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -142,55 +141,43 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     return ActivateWorkResult.QUEUED;
   }
 
-  @AutoValue
-  public abstract static class FailedTokens {
-    public static Builder newBuilder() {
-      return new AutoValue_ActiveWorkState_FailedTokens.Builder();
-    }
-
-    public abstract long workToken();
-
-    public abstract long cacheToken();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setWorkToken(long value);
-
-      public abstract Builder setCacheToken(long value);
-
-      public abstract FailedTokens build();
-    }
-  }
-
   /**
    * Fails any active work matching an element of the input Map.
    *
    * @param failedWork a map from sharding_key to tokens for the corresponding 
work.
    */
-  synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+  synchronized void failWorkForKey(Map<Long, List<FailedWorkToken>> 
failedWork) {
     // Note we can't construct a ShardedKey and look it up in activeWork 
directly since
     // HeartbeatResponse doesn't include the user key.
     for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
-      List<FailedTokens> failedTokens = 
failedWork.get(entry.getKey().shardingKey());
-      if (failedTokens == null) continue;
-      for (FailedTokens failedToken : failedTokens) {
-        for (Work queuedWork : entry.getValue()) {
-          WorkItem workItem = queuedWork.getWorkItem();
-          if (workItem.getWorkToken() == failedToken.workToken()
-              && workItem.getCacheToken() == failedToken.cacheToken()) {
-            LOG.debug(
-                "Failing work "
-                    + computationStateCache.getComputation()
-                    + " "
-                    + entry.getKey().shardingKey()
-                    + " "
-                    + failedToken.workToken()
-                    + " "
-                    + failedToken.cacheToken()
-                    + ". The work will be retried and is not lost.");
-            queuedWork.setFailed();
-            break;
-          }
+      ShardedKey shardedKey = entry.getKey();
+      Optional.ofNullable(failedWork.get(shardedKey.shardingKey()))
+          .ifPresent(
+              failedWorkTokens ->

Review Comment:
   nit: I find this harder to read than just having a variable storing result 
of failedWork.get and using an if statement, as it's harder to eyeball and 
takes a little more effort to realize that this is processing on the result of 
failedWork.get
   
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##########
@@ -109,10 +113,13 @@
 })
 public class GrpcWindmillServerTest {
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  @Rule public GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  @Rule public ErrorCollector errorCollector = new ErrorCollector();
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServerTest.class);
   private static final int STREAM_CHUNK_SIZE = 2 << 20;
+  private final long clientId = new Random().nextLong();

Review Comment:
   nit: just hard-code something?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -504,14 +548,23 @@ public static void main(String[] args) throws Exception {
     worker.start();
   }
 
-  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options)
-      throws IOException {
-
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+    StreamingDataflowWorkerOptions streamingOptions =
+        options.as(StreamingDataflowWorkerOptions.class);

Review Comment:
   no longer needed, already right option type



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkHeartbeatProcessor.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatResponse;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class WorkHeartbeatProcessor implements 
Consumer<List<ComputationHeartbeatResponse>> {
+  private final Function<String, Optional<ComputationState>> 
computationStateFetcher;
+
+  public WorkHeartbeatProcessor(
+      Function<String, Optional<ComputationState>> computationStateFetcher) {

Review Comment:
   comment on what string is



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
+  }
+
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
+    Preconditions.checkState(
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(stubs.size()));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  boolean isReady() {

Review Comment:
   ping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to