This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9fa621  Release bundle processor when any exceptions during 
processing.
     new 060b083  Merge pull request #13568 from [BEAM-3245] Release bundle 
processor when any exceptions during processing.
e9fa621 is described below

commit e9fa621a10ce6a84179fba1d54b413535a1a46a7
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed Dec 16 10:44:57 2020 -0800

    Release bundle processor when any exceptions during processing.
---
 .../fn/harness/control/ProcessBundleHandler.java   | 87 ++++++++++++----------
 1 file changed, 46 insertions(+), 41 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index e93c67b..4ecb5f5 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -293,55 +293,60 @@ public class ProcessBundleHandler {
     ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
     QueueingBeamFnDataClient queueingClient = 
bundleProcessor.getQueueingClient();
 
-    try (HandleStateCallsForBundle beamFnStateClient = 
bundleProcessor.getBeamFnStateClient()) {
-      try (Closeable closeTracker = stateTracker.activate()) {
-        // Already in reverse topological order so we don't need to do 
anything.
-        for (ThrowingRunnable startFunction : 
startFunctionRegistry.getFunctions()) {
-          LOG.debug("Starting function {}", startFunction);
-          startFunction.run();
-        }
+    try {
+      try (HandleStateCallsForBundle beamFnStateClient = 
bundleProcessor.getBeamFnStateClient()) {
+        try (Closeable closeTracker = stateTracker.activate()) {
+          // Already in reverse topological order so we don't need to do 
anything.
+          for (ThrowingRunnable startFunction : 
startFunctionRegistry.getFunctions()) {
+            LOG.debug("Starting function {}", startFunction);
+            startFunction.run();
+          }
 
-        queueingClient.drainAndBlock();
+          queueingClient.drainAndBlock();
 
-        // Need to reverse this since we want to call finish in topological 
order.
-        for (ThrowingRunnable finishFunction :
-            Lists.reverse(finishFunctionRegistry.getFunctions())) {
-          LOG.debug("Finishing function {}", finishFunction);
-          finishFunction.run();
+          // Need to reverse this since we want to call finish in topological 
order.
+          for (ThrowingRunnable finishFunction :
+              Lists.reverse(finishFunctionRegistry.getFunctions())) {
+            LOG.debug("Finishing function {}", finishFunction);
+            finishFunction.run();
+          }
         }
-      }
 
-      // Add all checkpointed residuals to the response.
-      
response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
-
-      // TODO(BEAM-6597): This should be reporting monitoring infos using the 
short id system.
-      // Get start bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          
bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Get process bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          
bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
-      // Get finish bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          
bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Extract MonitoringInfos that come from the metrics container registry.
-      response.addAllMonitoringInfos(
-          bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
-      // Add any additional monitoring infos that the "runners" report 
explicitly.
-      for (ProgressRequestCallback progressRequestCallback :
-          bundleProcessor.getProgressRequestCallbacks()) {
-        
response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
-      }
+        // Add all checkpointed residuals to the response.
+        
response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
+
+        // TODO(BEAM-6597): This should be reporting monitoring infos using 
the short id system.
+        // Get start bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            
bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
+        // Get process bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            
bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
+        // Get finish bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            
bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
+        // Extract MonitoringInfos that come from the metrics container 
registry.
+        response.addAllMonitoringInfos(
+            
bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
+        // Add any additional monitoring infos that the "runners" report 
explicitly.
+        for (ProgressRequestCallback progressRequestCallback :
+            bundleProcessor.getProgressRequestCallbacks()) {
+          
response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
+        }
 
-      if 
(!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
-        finalizeBundleHandler.registerCallbacks(
-            bundleProcessor.getInstructionId(),
-            
ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
-        response.setRequiresFinalization(true);
+        if 
(!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
+          finalizeBundleHandler.registerCallbacks(
+              bundleProcessor.getInstructionId(),
+              
ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
+          response.setRequiresFinalization(true);
+        }
       }
-
       bundleProcessorCache.release(
           request.getProcessBundle().getProcessBundleDescriptorId(), 
bundleProcessor);
+    } catch (Exception e) {
+      bundleProcessorCache.release(
+          request.getProcessBundle().getProcessBundleDescriptorId(), 
bundleProcessor);
+      throw e;
     }
     return 
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
   }

Reply via email to