scwhittle commented on code in PR #30425:
URL: https://github.com/apache/beam/pull/30425#discussion_r1520151250


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows <a
+ * href=https://grpc.io/docs/guides/performance/#java>gRPC recommendations</a> 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache<WindmillServiceAddress, ManagedChannel> 
channelCache;
+
+  private ChannelCache(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory,
+      RemovalListener<WindmillServiceAddress, ManagedChannel> 
onChannelRemoved) {
+    this.channelCache =
+        
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);
+  }
+
+  public static ChannelCache create(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
+    return new ChannelCache(
+        channelFactory,
+        // Shutdown the channels as they get removed from the cache, so they 
do not leak.
+        (address, channel, cause) -> shutdownChannel(channel));
+  }
+
+  @VisibleForTesting
+  static ChannelCache forTesting(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory, 
Runnable onChannelShutdown) {
+    return new ChannelCache(
+        channelFactory,
+        // Shutdown the channels as they get removed from the cache, so they 
do not leak.
+        // Add hook for testing so that we don't have to sleep/wait for 
arbitrary time in test.
+        (address, channel, cause) -> {
+          shutdownChannel(channel);
+          onChannelShutdown.run();
+        });
+  }
+
+  private static void shutdownChannel(ManagedChannel channel) {
+    channel.shutdown();
+    try {
+      channel.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Couldn't close gRPC channel={}", channel, e);
+    }
+    channel.shutdownNow();
+  }
+
+  public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
+    return channelCache.get(windmillServiceAddress);
+  }
+
+  public void remove(WindmillServiceAddress windmillServiceAddress) {
+    channelCache.invalidate(windmillServiceAddress);
+  }
+
+  public void clear() {
+    channelCache.invalidateAll();
+  }
+
+  @VisibleForTesting
+  boolean isEmpty() {
+    // Perform any pending removal/insert operations first.
+    channelCache.cleanUp();

Review Comment:
   does this block for removal listeners if they are on executor? add comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache of gRPC channels for {@link WindmillServiceAddress}. Follows <a
+ * href=https://grpc.io/docs/guides/performance/#java>gRPC recommendations</a> 
for re-using channels
+ * when possible.
+ *
+ * @implNote Backed by {@link LoadingCache} which is thread-safe.
+ */
+@ThreadSafe
+public final class ChannelCache implements StatusDataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChannelCache.class);
+  private final LoadingCache<WindmillServiceAddress, ManagedChannel> 
channelCache;
+
+  private ChannelCache(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory,
+      RemovalListener<WindmillServiceAddress, ManagedChannel> 
onChannelRemoved) {
+    this.channelCache =
+        
Caffeine.newBuilder().removalListener(onChannelRemoved).build(channelFactory::apply);

Review Comment:
   I think you need to set an executor for the removallistener to be async
   
   could perhaps add a test that the onChannelRemoved happens async by having 
removal listener that blocks on something the main thread notifies/counts down 
on the same following a remove call.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCachingStubFactory.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+
+public interface ChannelCachingStubFactory extends WindmillStubFactory {
+
+  /**
+   * Remove and close the gRPC channel used to communicate with the given 
{@link
+   * WindmillServiceAddress}.

Review Comment:
   document that subsequent get calls for the address will get a new channel 
instance, and note that users of previous vended channel will start to get 
errors



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelCacheTest {
+
+  private ChannelCache cache;
+
+  private static ChannelCache newCache(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
+    return ChannelCache.forTesting(channelFactory, () -> {});
+  }
+
+  @After
+  public void cleanUp() {
+    if (cache != null) {
+      cache.clear();
+    }
+  }
+
+  private ManagedChannel newChannel(String channelName) {
+    return WindmillChannelFactory.inProcessChannel(channelName);
+  }
+
+  @Test
+  public void testLoadingCacheReturnsExistingChannel() {
+    String channelName = "existingChannel";
+    ManagedChannel channel = newChannel(channelName);
+    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
+        spy(
+            new Function<WindmillServiceAddress, ManagedChannel>() {
+              @Override
+              public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+                return channel;
+              }
+            });
+
+    cache = newCache(channelFactory);
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    // Initial call to load the cache.
+    cache.get(someAddress);
+
+    ManagedChannel cachedChannel = cache.get(someAddress);
+    assertSame(channel, cachedChannel);
+    verify(channelFactory, times(1)).apply(eq(someAddress));
+  }
+
+  @Test
+  public void testLoadingCacheReturnsLoadsChannelWhenNotPresent() {
+    String channelName = "existingChannel";
+    ManagedChannel channel = newChannel(channelName);
+    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
+        spy(
+            new Function<WindmillServiceAddress, ManagedChannel>() {
+              @Override
+              public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+                return channel;
+              }
+            });
+
+    cache = newCache(channelFactory);
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    ManagedChannel cachedChannel = cache.get(someAddress);
+    assertSame(channel, cachedChannel);
+    verify(channelFactory, times(1)).apply(eq(someAddress));
+  }
+
+  @Test
+  public void testRemoveAndClose() throws InterruptedException {

Review Comment:
   see other comments
   - test that a new call for the address gets a different channel not closed
   - test that removal fn is async and doesn't block remove()



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ChannelCacheTest {
+
+  private ChannelCache cache;
+
+  private static ChannelCache newCache(
+      Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
+    return ChannelCache.forTesting(channelFactory, () -> {});
+  }
+
+  @After
+  public void cleanUp() {
+    if (cache != null) {
+      cache.clear();
+    }
+  }
+
+  private ManagedChannel newChannel(String channelName) {
+    return WindmillChannelFactory.inProcessChannel(channelName);
+  }
+
+  @Test
+  public void testLoadingCacheReturnsExistingChannel() {
+    String channelName = "existingChannel";
+    ManagedChannel channel = newChannel(channelName);
+    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
+        spy(
+            new Function<WindmillServiceAddress, ManagedChannel>() {
+              @Override
+              public ManagedChannel apply(WindmillServiceAddress 
windmillServiceAddress) {
+                return channel;
+              }
+            });
+
+    cache = newCache(channelFactory);
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    // Initial call to load the cache.
+    cache.get(someAddress);

Review Comment:
   might as well assert this is the same as channel



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -468,10 +472,18 @@ private StreamingDataflowWorker(
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     ConcurrentMap<String, ComputationState> computationMap = new 
ConcurrentHashMap<>();
     long clientId = clientIdGenerator.nextLong();
+    ChannelCache channelCache =

Review Comment:
   I think it's ok for them to have their own caches internally.  They will not 
generally overlap in direct path mode, and in non-direct path mode a separate 
channel for the metadata is ok.
   
   We wouldn't want a shared cache if the StreamingEngineClient for example 
invalidates a channel that is being used by the GrpcDispatcherClient.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/IsolationChannel.java:
##########
@@ -44,7 +44,7 @@
  * that each active rpc has its own channel.
  */
 @Internal
-class IsolationChannel extends ManagedChannel {
+public class IsolationChannel extends ManagedChannel {

Review Comment:
   does this still need to be public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to