scwhittle commented on code in PR #28428:
URL: https://github.com/apache/beam/pull/28428#discussion_r1328633216


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/ReadOnlyWindmillConnectionsCache.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import java.util.Optional;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
+
+/** Read only interface for {@link WindmillConnectionsCache}. */
+public interface ReadOnlyWindmillConnectionsCache {
+
+  /**
+   * Used by GetDataStream and CommitWorkStream calls inorder to make sure 
Windmill API calls get
+   * routed to the same workers that GetWork was initiated on. If no {@link 
WindmillConnection} is
+   * found, it means that the windmill worker working on the key range has 
moved on (ranges have
+   * moved to other workers, worker crash, etc.).
+   *
+   * @see <a
+   *     
href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill
+   *     API</a>
+   */
+  Optional<WindmillConnection> 
getWindmillWorkerConnection(WindmillConnectionCacheToken token);
+
+  /**
+   * Used by GetWorkStream to initiate the Windmill API calls and fetch Work 
items.
+   *
+   * @see <a
+   *     
href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill
+   *     API</a>
+   */
+  WindmillConnection getNewWindmillWorkerConnection();
+
+  CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub 
getDispatcherStub();
+
+  Optional<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub> 
getGlobalDataStub(
+      Windmill.GlobalDataId globalDataId);
+
+  Optional<WindmillApplianceGrpc.WindmillApplianceBlockingStub> 
getWindmillApplianceStub();

Review Comment:
   does this need to be part of this interface? Seems cleaner to keep appliance 
stuff separate.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java:
##########
@@ -51,6 +52,30 @@ void receiveWork(
           Windmill.WorkItem workItem,
           Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);
     }
+
+    @AutoValue
+    abstract class GetWorkBudget {
+      public static GetWorkBudget.Builder builder() {
+        return new 
AutoValue_WindmillStream_GetWorkStream_GetWorkBudget.Builder();
+      }
+
+      public GetWorkBudget consumeBudgetUpdate(long bytes, long items) {

Review Comment:
   consume made me think it was going to subtract
   
   how about just increment or add? Doesn't really matter where it came from 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillChannelFactory.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+
+/** Utilities for creating {@link Channel} for gRPC stubs. */
+final class WindmillChannelFactory {
+  static final String LOCALHOST = "localhost";
+  private static final int DEFAULT_GRPC_PORT = 443;
+
+  private WindmillChannelFactory() {}
+
+  static Channel inProcessChannel(String channelName) {
+    return 
InProcessChannelBuilder.forName(channelName).directExecutor().build();
+  }
+
+  static Channel localhostChannel(int port) {
+    return NettyChannelBuilder.forAddress(LOCALHOST, port)
+        .maxInboundMessageSize(Integer.MAX_VALUE)
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .build();
+  }
+
+  static Channel remoteChannel(
+      WindmillServiceAddress windmillServiceAddress, int 
windmillServiceRpcChannelTimeoutSec) {

Review Comment:
   windmillServiceRpcChannelTimeoutSec is a little unclear (ie could be entire 
channel lifetime, connect timeout etc)
   
   how about
   channelKeepAliveTimeoutSec?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/ReadOnlyWindmillConnectionsCache.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import java.util.Optional;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
+
+/** Read only interface for {@link WindmillConnectionsCache}. */
+public interface ReadOnlyWindmillConnectionsCache {
+
+  /**
+   * Used by GetDataStream and CommitWorkStream calls inorder to make sure 
Windmill API calls get
+   * routed to the same workers that GetWork was initiated on. If no {@link 
WindmillConnection} is
+   * found, it means that the windmill worker working on the key range has 
moved on (ranges have
+   * moved to other workers, worker crash, etc.).
+   *
+   * @see <a
+   *     
href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill
+   *     API</a>
+   */
+  Optional<WindmillConnection> 
getWindmillWorkerConnection(WindmillConnectionCacheToken token);
+
+  /**
+   * Used by GetWorkStream to initiate the Windmill API calls and fetch Work 
items.
+   *
+   * @see <a
+   *     
href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill
+   *     API</a>
+   */
+  WindmillConnection getNewWindmillWorkerConnection();

Review Comment:
   In the direct user-worker to windmill worker mode, there is a set of 
endpoints that get work needs to be sent to not just 1.
   
   With the multiple endpoints, we need to think through when we find out about 
them.  One mechanism would be to have the existing interface and poll it 
periodically. But that introduces latency so we might want to move to where 
this connections cache either notifies something else via a Consumer etc of 
endpoints deltas or contains the streams itself.
   
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillConnectionsCache.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.LOCALHOST;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.localhostChannel;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.connectionscache.WindmillChannelFactory.remoteChannel;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.auto.value.AutoValue;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ThreadSafe
+final class WindmillConnectionsCache implements 
ReadWriteWindmillConnectionsCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillConnectionsCache.class);
+
+  private final Object dispatcherConnectionLock;
+
+  @GuardedBy("dispatcherConnectionLock")
+  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
+
+  @GuardedBy("dispatcherConnectionLock")
+  private final Set<HostAndPort> dispatcherEndpoints;
+
+  @GuardedBy("dispatcherConnectionLock")
+  private final Random rand;
+
+  /**
+   * Writes are guarded by synchronization on "this". Reads are done grabbing 
the {@link

Review Comment:
   is it guarded by synchronization on dispatcherConnectionLock?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/WindmillConnectionCacheToken.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import com.google.auto.value.AutoValue;
+import java.util.UUID;
+
+@AutoValue
+public abstract class WindmillConnectionCacheToken {

Review Comment:
   Do we need the tokens for Java? With garbage collection perhaps we can just 
hand out per-endpoint objects to associate with work items.  Then we don't have 
to go back to the cache to lookup based upon the token for subsequent 
getdata/commit , we just have the endpoint object itself.
   
   The cache can keep the same endpoint objects as metadata updates come in if 
the endpoint remains. If the endpoint is removed, we could note that in the 
object and subsequent calls using it could fail fast.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/connectionscache/VendoredCredentialsAdapter.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.connectionscache;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials;
+import 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Create a wrapper around credentials that delegates to the underlying {@link
+ * com.google.auth.Credentials}. Note that this class should override every 
method that is not final
+ * and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation

Review Comment:
   I think it means some sort of annotation to generate a proxy class. So that 
if new methods are added to the Credentials, this doesn't have to be updated.
   
   That said, I don't see why this class is needed at all from just looking 
here. It doesn't seem to modify any behavior, just wraps and forwards on to the 
underlying.
   
   You could try removing it and see if it works (can be separate PR) or look 
through the history more, perhaps it used to do something and should have been 
removed when that functionality changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to