This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 727844e  [BEAM-12524] Ensure that failed BundleProcessor objects are 
not re-added to the cache.
     new ed76a98  Merge pull request #15062 from [BEAM-12524] Ensure that 
failed BundleProcessor objects are not re-added to the cache.
727844e is described below

commit 727844eeac9dbfb1824a1c6cde73ac8f0ccbd984
Author: Luke Cwik <[email protected]>
AuthorDate: Tue Jun 22 10:49:23 2021 -0700

    [BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to 
the cache.
    
    We had tests to cover this but the usage of ExpectedException prevented the 
assertion that the cache was "empty" from running since the #processBundle 
method was throwing an exception.
---
 .../fn/harness/control/ProcessBundleHandler.java   |  30 +++---
 .../harness/control/ProcessBundleHandlerTest.java  | 109 ++++++++++++---------
 2 files changed, 83 insertions(+), 56 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 32bc803..1f6471e 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -309,12 +309,13 @@ public class ProcessBundleHandler {
                 throw new RuntimeException(e);
               }
             });
-    PTransformFunctionRegistry startFunctionRegistry = 
bundleProcessor.getStartFunctionRegistry();
-    PTransformFunctionRegistry finishFunctionRegistry = 
bundleProcessor.getFinishFunctionRegistry();
-    ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
-    QueueingBeamFnDataClient queueingClient = 
bundleProcessor.getQueueingClient();
-
     try {
+      PTransformFunctionRegistry startFunctionRegistry = 
bundleProcessor.getStartFunctionRegistry();
+      PTransformFunctionRegistry finishFunctionRegistry =
+          bundleProcessor.getFinishFunctionRegistry();
+      ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
+      QueueingBeamFnDataClient queueingClient = 
bundleProcessor.getQueueingClient();
+
       try (HandleStateCallsForBundle beamFnStateClient = 
bundleProcessor.getBeamFnStateClient()) {
         try (Closeable closeTracker = stateTracker.activate()) {
           // Already in reverse topological order so we don't need to do 
anything.
@@ -354,14 +355,16 @@ public class ProcessBundleHandler {
           response.setRequiresFinalization(true);
         }
       }
+
+      // Mark the bundle processor as re-usable.
       bundleProcessorCache.release(
           request.getProcessBundle().getProcessBundleDescriptorId(), 
bundleProcessor);
+      return 
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
     } catch (Exception e) {
-      bundleProcessorCache.release(
-          request.getProcessBundle().getProcessBundleDescriptorId(), 
bundleProcessor);
+      // Make sure we clean-up from the active set of bundle processors.
+      bundleProcessorCache.discard(bundleProcessor);
       throw e;
     }
-    return 
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
   }
 
   public BeamFnApi.InstructionResponse.Builder 
progress(BeamFnApi.InstructionRequest request)
@@ -648,13 +651,12 @@ public class ProcessBundleHandler {
     }
 
     /**
-     * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} 
will be reset before
-     * being added to the cache and will be marked as inactive.
+     * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} 
will be marked as
+     * inactive and reset before being added to the cache.
      */
     void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
       activeBundleProcessors.remove(bundleProcessor.getInstructionId());
       try {
-        bundleProcessor.setInstructionId(null);
         bundleProcessor.reset();
         cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
       } catch (Exception e) {
@@ -665,6 +667,11 @@ public class ProcessBundleHandler {
       }
     }
 
+    /** Discard an active {@link BundleProcessor} instead of being re-used. */
+    void discard(BundleProcessor bundleProcessor) {
+      activeBundleProcessors.remove(bundleProcessor.getInstructionId());
+    }
+
     /** Shutdown all the cached {@link BundleProcessor}s, running the 
tearDown() functions. */
     void shutdown() throws Exception {
       cachedBundleProcessors.invalidateAll();
@@ -742,6 +749,7 @@ public class ProcessBundleHandler {
     }
 
     void reset() throws Exception {
+      this.instructionId = null;
       getStartFunctionRegistry().reset();
       getFinishFunctionRegistry().reset();
       getSplitListener().clear();
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 65947f5..f947cd6 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -22,10 +22,12 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.argThat;
@@ -99,9 +101,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
@@ -119,8 +119,6 @@ public class ProcessBundleHandlerTest {
   private static final String DATA_INPUT_URN = "beam:runner:source:v1";
   private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
 
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
   @Mock private BeamFnDataClient beamFnDataClient;
   @Captor private ArgumentCaptor<ThrowingConsumer<Exception, 
WindowedValue<String>>> consumerCaptor;
 
@@ -644,6 +642,14 @@ public class ProcessBundleHandlerTest {
     // After it is released, ensure the bundle processor is no longer found
     cache.release("descriptorId", bundleProcessor);
     assertNull(cache.find("known"));
+
+    // Once it is active, ensure the bundle processor is found
+    cache.get("descriptorId", "known", () -> bundleProcessor);
+    assertSame(bundleProcessor, cache.find("known"));
+
+    // After it is discarded, ensure the bundle processor is no longer found
+    cache.discard(bundleProcessor);
+    assertNull(cache.find("known"));
   }
 
   @Test
@@ -676,6 +682,7 @@ public class ProcessBundleHandlerTest {
             bundleFinalizationCallbacks);
 
     bundleProcessor.reset();
+    assertNull(bundleProcessor.getInstructionId());
     verify(startFunctionRegistry, times(1)).reset();
     verify(finishFunctionRegistry, times(1)).reset();
     verify(splitListener, times(1)).clear();
@@ -727,16 +734,19 @@ public class ProcessBundleHandlerTest {
                     addProgressRequestCallback,
                     splitListener,
                     bundleFinalizer) -> {
-                  thrown.expect(IllegalStateException.class);
-                  thrown.expectMessage("TestException");
                   throw new IllegalStateException("TestException");
                 }),
             new BundleProcessorCache());
-    handler.processBundle(
-        BeamFnApi.InstructionRequest.newBuilder()
-            .setProcessBundle(
-                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
-            .build());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
   }
 
   @Test
@@ -813,7 +823,7 @@ public class ProcessBundleHandlerTest {
   }
 
   @Test
-  public void testPTransformStartExceptionsArePropagated() throws Exception {
+  public void testPTransformStartExceptionsArePropagated() {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
             .putTransforms(
@@ -854,23 +864,24 @@ public class ProcessBundleHandlerTest {
                         addProgressRequestCallback,
                         splitListener,
                         bundleFinalizer) -> {
-                      thrown.expect(IllegalStateException.class);
-                      thrown.expectMessage("TestException");
                       startFunctionRegistry.register(
                           pTransformId, 
ProcessBundleHandlerTest::throwException);
                       return null;
                     }),
             new BundleProcessorCache());
-    handler.processBundle(
-        BeamFnApi.InstructionRequest.newBuilder()
-            .setProcessBundle(
-                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
-            .build());
-
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
     // BundleProcessor is not re-added back to the BundleProcessorCache in 
case of an exception
     // during bundle processing
-    assertThat(
-        handler.bundleProcessorCache.getCachedBundleProcessors(), 
equalTo(Collections.EMPTY_MAP));
+    
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), 
empty());
   }
 
   @Test
@@ -915,23 +926,25 @@ public class ProcessBundleHandlerTest {
                         addProgressRequestCallback,
                         splitListener,
                         bundleFinalizer) -> {
-                      thrown.expect(IllegalStateException.class);
-                      thrown.expectMessage("TestException");
                       finishFunctionRegistry.register(
                           pTransformId, 
ProcessBundleHandlerTest::throwException);
                       return null;
                     }),
             new BundleProcessorCache());
-    handler.processBundle(
-        BeamFnApi.InstructionRequest.newBuilder()
-            .setProcessBundle(
-                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
-            .build());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
 
     // BundleProcessor is not re-added back to the BundleProcessorCache in 
case of an exception
     // during bundle processing
-    assertThat(
-        handler.bundleProcessorCache.getCachedBundleProcessors(), 
equalTo(Collections.EMPTY_MAP));
+    
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), 
empty());
   }
 
   @Test
@@ -1089,19 +1102,22 @@ public class ProcessBundleHandlerTest {
                   }
 
                   private void doStateCalls(BeamFnStateClient 
beamFnStateClient) {
-                    thrown.expect(IllegalStateException.class);
-                    thrown.expectMessage("State API calls are unsupported");
                     beamFnStateClient.handle(
                         StateRequest.newBuilder().setInstructionId("SUCCESS"),
                         new CompletableFuture<>());
                   }
                 }),
             new BundleProcessorCache());
-    handler.processBundle(
-        BeamFnApi.InstructionRequest.newBuilder()
-            .setProcessBundle(
-                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
-            .build());
+    assertThrows(
+        "State API calls are unsupported",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
   }
 
   @Test
@@ -1155,8 +1171,6 @@ public class ProcessBundleHandlerTest {
                   }
 
                   private void doTimerRegistrations(BeamFnTimerClient 
beamFnTimerClient) {
-                    thrown.expect(IllegalStateException.class);
-                    thrown.expectMessage("Timers are unsupported");
                     beamFnTimerClient.register(
                         LogicalEndpoint.timer("1L", "2L", "Timer"),
                         Timer.Coder.of(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE),
@@ -1164,11 +1178,16 @@ public class ProcessBundleHandlerTest {
                   }
                 }),
             new BundleProcessorCache());
-    handler.processBundle(
-        BeamFnApi.InstructionRequest.newBuilder()
-            .setProcessBundle(
-                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
-            .build());
+    assertThrows(
+        "Timers are unsupported",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
   }
 
   private static void throwException() {

Reply via email to