m-trieu commented on code in PR #34539:
URL: https://github.com/apache/beam/pull/34539#discussion_r2036702184


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##########
@@ -108,7 +112,7 @@ public void testRemoveAndClose() throws 
InterruptedException {
     CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
     cache =
         ChannelCache.forTesting(
-            ignored -> newChannel(channelName), 
notifyWhenChannelClosed::countDown);
+            (a, b) -> newChannel(channelName), 
notifyWhenChannelClosed::countDown);

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##########
@@ -45,7 +49,7 @@ public class ChannelCacheTest {
 
   private static ChannelCache newCache(
       Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
-    return ChannelCache.forTesting(channelFactory, () -> {});
+    return ChannelCache.forTesting((ignored, address) -> 
channelFactory.apply(address), () -> {});

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java:
##########
@@ -132,12 +136,80 @@ public void testClear() throws InterruptedException {
     CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
     cache =
         ChannelCache.forTesting(
-            ignored -> newChannel(channelName), 
notifyWhenChannelClosed::countDown);
+            (a, b) -> newChannel(channelName), 
notifyWhenChannelClosed::countDown);
     WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
     ManagedChannel cachedChannel = cache.get(someAddress);
     cache.clear();
     notifyWhenChannelClosed.await();
     assertTrue(cache.isEmpty());
     assertTrue(cachedChannel.isShutdown());
   }
+
+  @Test
+  public void testConsumeFlowControlSettings() throws InterruptedException {
+    String channelName = "channel";
+    CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
+    AtomicInteger newChannelsCreated = new AtomicInteger();
+    cache =
+        ChannelCache.forTesting(
+            (a, b) -> {
+              ManagedChannel channel = newChannel(channelName);
+              newChannelsCreated.incrementAndGet();
+              return channel;
+            },
+            notifyWhenChannelClosed::countDown);
+    WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class);
+    when(someAddress.getKind())
+        
.thenReturn(WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS);
+    ManagedChannel cachedChannel = cache.get(someAddress);
+    cache.consumeFlowControlSettings(
+        UserWorkerGrpcFlowControlSettings.newBuilder()
+            .setEnableAutoFlowControl(true)
+            .setOnReadyThresholdBytes(1)
+            .setFlowControlWindowBytes(1)
+            .build());
+    ManagedChannel reloadedChannel = cache.get(someAddress);
+    notifyWhenChannelClosed.await();
+    assertThat(cachedChannel).isNotSameInstanceAs(reloadedChannel);
+    assertTrue(cachedChannel.isShutdown());
+    assertFalse(reloadedChannel.isShutdown());
+    assertThat(newChannelsCreated.get()).isEqualTo(2);
+    assertThat(cache.get(someAddress)).isSameInstanceAs(reloadedChannel);
+  }
+
+  @Test
+  public void testConsumeFlowControlSettings_sameFlowControlSettings() {
+    String channelName = "channel";
+    AtomicInteger newChannelsCreated = new AtomicInteger();
+    UserWorkerGrpcFlowControlSettings flowControlSettings =
+        UserWorkerGrpcFlowControlSettings.newBuilder()
+            .setEnableAutoFlowControl(true)
+            .setOnReadyThresholdBytes(1)
+            .setFlowControlWindowBytes(1)
+            .build();
+    cache =
+        ChannelCache.forTesting(
+            (a, b) -> {
+              ManagedChannel channel = newChannel(channelName);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to