This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa17857  [BEAM-12294] Implement close function for BeamFnStatusClient 
to shutdown mananged channel
     new 79212a6  Merge pull request #14741 from y1chi/beam-12294
aa17857 is described below

commit aa178574ecbe5eede06b4fa7b0cba3678b1b3fd5
Author: Yichi Zhang <[email protected]>
AuthorDate: Wed May 5 17:40:31 2021 -0700

    [BEAM-12294] Implement close function for BeamFnStatusClient to shutdown 
mananged channel
---
 .../java/org/apache/beam/fn/harness/FnHarness.java | 15 ++++++----
 .../beam/fn/harness/status/BeamFnStatusClient.java | 35 ++++++++++++++++++----
 2 files changed, 40 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 231d2b8..0ca4581 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -272,12 +272,14 @@ public class FnHarness {
               finalizeBundleHandler,
               metricsShortIds);
 
+      BeamFnStatusClient beamFnStatusClient = null;
       if (statusApiServiceDescriptor != null) {
-        new BeamFnStatusClient(
-            statusApiServiceDescriptor,
-            channelFactory::forDescriptor,
-            processBundleHandler.getBundleProcessorCache(),
-            options);
+        beamFnStatusClient =
+            new BeamFnStatusClient(
+                statusApiServiceDescriptor,
+                channelFactory::forDescriptor,
+                processBundleHandler.getBundleProcessorCache(),
+                options);
       }
 
       // TODO(BEAM-9729): Remove once runners no longer send this instruction.
@@ -337,6 +339,9 @@ public class FnHarness {
               executorService,
               handlers);
       control.waitForTermination();
+      if (beamFnStatusClient != null) {
+        beamFnStatusClient.close();
+      }
       processBundleHandler.shutdown();
     } finally {
       System.out.println("Shutting SDK harness down.");
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
index 4c01c04..e059471 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
 import 
org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache;
@@ -42,9 +44,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BeamFnStatusClient {
+public class BeamFnStatusClient implements AutoCloseable {
+  private static final Object COMPLETED = new Object();
   private final StreamObserver<WorkerStatusResponse> outboundObserver;
   private final BundleProcessorCache processBundleCache;
+  private final ManagedChannel channel;
+  private final CompletableFuture<Object> inboundObserverCompletion;
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnStatusClient.class);
   private final MemoryMonitor memoryMonitor;
 
@@ -53,11 +58,12 @@ public class BeamFnStatusClient {
       Function<ApiServiceDescriptor, ManagedChannel> channelFactory,
       BundleProcessorCache processBundleCache,
       PipelineOptions options) {
-    BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub stub =
-        
BeamFnWorkerStatusGrpc.newStub(channelFactory.apply(apiServiceDescriptor));
-    this.outboundObserver = stub.workerStatus(new InboundObserver());
+    this.channel = channelFactory.apply(apiServiceDescriptor);
+    this.outboundObserver =
+        BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new 
InboundObserver());
     this.processBundleCache = processBundleCache;
     this.memoryMonitor = MemoryMonitor.fromOptions(options);
+    this.inboundObserverCompletion = new CompletableFuture<>();
     Thread thread = new Thread(memoryMonitor);
     thread.setDaemon(true);
     thread.setPriority(Thread.MIN_PRIORITY);
@@ -65,6 +71,22 @@ public class BeamFnStatusClient {
     thread.start();
   }
 
+  @Override
+  public void close() throws Exception {
+    try {
+      Object completion = inboundObserverCompletion.get(1, TimeUnit.MINUTES);
+      if (completion != COMPLETED) {
+        LOG.warn("InboundObserver for BeamFnStatusClient completed with 
exception.");
+      }
+    } finally {
+      // Shut the channel down
+      channel.shutdown();
+      if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
+        channel.shutdownNow();
+      }
+    }
+  }
+
   /**
    * Class representing the execution state of a thread.
    *
@@ -222,9 +244,12 @@ public class BeamFnStatusClient {
     @Override
     public void onError(Throwable t) {
       LOG.error("Error getting SDK harness status", t);
+      inboundObserverCompletion.completeExceptionally(t);
     }
 
     @Override
-    public void onCompleted() {}
+    public void onCompleted() {
+      inboundObserverCompletion.complete(COMPLETED);
+    }
   }
 }

Reply via email to