amaliujia commented on a change in pull request #14480:
URL: https://github.com/apache/beam/pull/14480#discussion_r612805660



##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
##########
@@ -235,6 +238,118 @@ public void testBasicInboundConsumerBehaviour() throws 
Exception {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testClosingWithFullInboundQueue() throws Exception {
+    CountDownLatch waitForClientToConnect = new CountDownLatch(1);
+    CountDownLatch allowValueProcessing = new CountDownLatch(1);
+    final int numValues = 100;
+    CountDownLatch receiveAllValues = new CountDownLatch(numValues);
+    Collection<WindowedValue<String>> inboundValues = new 
ConcurrentLinkedQueue<>();
+    Collection<BeamFnApi.Elements> inboundServerValues = new 
ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver 
=
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+        TestStreams.withOnNext(inboundServerValues::add).build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
+            .build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnDataGrpc.BeamFnDataImplBase() {
+                  @Override
+                  public StreamObserver<BeamFnApi.Elements> data(
+                      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+                    outboundServerObserver.set(outboundObserver);
+                    waitForClientToConnect.countDown();
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+    try {
+      ManagedChannel channel =
+          
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+      BeamFnDataGrpcClient clientFactory =
+          new BeamFnDataGrpcClient(
+              PipelineOptionsFactory.create(),
+              (Endpoints.ApiServiceDescriptor descriptor) -> channel,
+              OutboundObserverFactory.trivial());
+      // We want the queue to have no room when we try to close. The queue size
+      // is therefore set to numValues -1 since one of the values has been 
removed
+      // from the queue to accept it.
+      QueueingBeamFnDataClient queueingClient =
+          new QueueingBeamFnDataClient(clientFactory, numValues - 1);
+
+      final AtomicInteger currentCount = new AtomicInteger();
+      InboundDataClient inboundDataClient =
+          queueingClient.receive(
+              apiServiceDescriptor,
+              ENDPOINT_A,
+              CODER,
+              (WindowedValue<String> wv) -> {
+                if (allowValueProcessing.getCount() != 0) {
+                  LOG.info("Inbound processing blocking");
+                }
+                allowValueProcessing.await();
+                LOG.info("Received " + wv.getValue());
+                assertEquals("ABC" + currentCount.getAndIncrement(), 
wv.getValue());
+              });
+
+      waitForClientToConnect.await();
+
+      // Start draining elements, the drain will be blocked by 
allowValueProcessing.
+      Future<?> drainElementsFuture =
+          executor.submit(
+              () -> {
+                try {
+                  queueingClient.drainAndBlock();
+                } catch (Exception e) {
+                  LOG.error("Failed ", e);
+                  fail();
+                }
+              });
+
+      // We should be able to send all the elements and complete without 
blocking.
+      for (int i = 0; i < numValues; ++i) {
+        BeamFnApi.Elements element =
+            BeamFnApi.Elements.newBuilder()
+                .addData(
+                    BeamFnApi.Elements.Data.newBuilder()
+                        .setInstructionId(ENDPOINT_A.getInstructionId())
+                        .setTransformId(ENDPOINT_A.getTransformId())
+                        .setData(
+                            ByteString.copyFrom(
+                                encodeToByteArray(CODER, 
valueInGlobalWindow("ABC" + i)))))
+                .build();
+        outboundServerObserver.get().onNext(element);
+      }
+      outboundServerObserver
+          .get()
+          .onNext(
+              BeamFnApi.Elements.newBuilder()
+                  .addData(
+                      BeamFnApi.Elements.Data.newBuilder()
+                          .setInstructionId(ENDPOINT_A.getInstructionId())
+                          .setTransformId(ENDPOINT_A.getTransformId())
+                          .setIsLast(true))
+                  .build());
+      inboundDataClient.awaitCompletion();
+
+      // Allow processing to complete and verify that draining finishes.
+      LOG.info("Completed client, allowing inbound processing.");

Review comment:
       Remove?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to