This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 727844e [BEAM-12524] Ensure that failed BundleProcessor objects are
not re-added to the cache.
new ed76a98 Merge pull request #15062 from [BEAM-12524] Ensure that
failed BundleProcessor objects are not re-added to the cache.
727844e is described below
commit 727844eeac9dbfb1824a1c6cde73ac8f0ccbd984
Author: Luke Cwik <[email protected]>
AuthorDate: Tue Jun 22 10:49:23 2021 -0700
[BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to
the cache.
We had tests to cover this but the usage of ExpectedException prevented the
assertion that the cache was "empty" from running since the #processBundle
method was throwing an exception.
---
.../fn/harness/control/ProcessBundleHandler.java | 30 +++---
.../harness/control/ProcessBundleHandlerTest.java | 109 ++++++++++++---------
2 files changed, 83 insertions(+), 56 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 32bc803..1f6471e 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -309,12 +309,13 @@ public class ProcessBundleHandler {
throw new RuntimeException(e);
}
});
- PTransformFunctionRegistry startFunctionRegistry =
bundleProcessor.getStartFunctionRegistry();
- PTransformFunctionRegistry finishFunctionRegistry =
bundleProcessor.getFinishFunctionRegistry();
- ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
- QueueingBeamFnDataClient queueingClient =
bundleProcessor.getQueueingClient();
-
try {
+ PTransformFunctionRegistry startFunctionRegistry =
bundleProcessor.getStartFunctionRegistry();
+ PTransformFunctionRegistry finishFunctionRegistry =
+ bundleProcessor.getFinishFunctionRegistry();
+ ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
+ QueueingBeamFnDataClient queueingClient =
bundleProcessor.getQueueingClient();
+
try (HandleStateCallsForBundle beamFnStateClient =
bundleProcessor.getBeamFnStateClient()) {
try (Closeable closeTracker = stateTracker.activate()) {
// Already in reverse topological order so we don't need to do
anything.
@@ -354,14 +355,16 @@ public class ProcessBundleHandler {
response.setRequiresFinalization(true);
}
}
+
+ // Mark the bundle processor as re-usable.
bundleProcessorCache.release(
request.getProcessBundle().getProcessBundleDescriptorId(),
bundleProcessor);
+ return
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
} catch (Exception e) {
- bundleProcessorCache.release(
- request.getProcessBundle().getProcessBundleDescriptorId(),
bundleProcessor);
+ // Make sure we clean-up from the active set of bundle processors.
+ bundleProcessorCache.discard(bundleProcessor);
throw e;
}
- return
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
}
public BeamFnApi.InstructionResponse.Builder
progress(BeamFnApi.InstructionRequest request)
@@ -648,13 +651,12 @@ public class ProcessBundleHandler {
}
/**
- * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor}
will be reset before
- * being added to the cache and will be marked as inactive.
+ * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor}
will be marked as
+ * inactive and reset before being added to the cache.
*/
void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
activeBundleProcessors.remove(bundleProcessor.getInstructionId());
try {
- bundleProcessor.setInstructionId(null);
bundleProcessor.reset();
cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
} catch (Exception e) {
@@ -665,6 +667,11 @@ public class ProcessBundleHandler {
}
}
+ /** Discard an active {@link BundleProcessor} instead of being re-used. */
+ void discard(BundleProcessor bundleProcessor) {
+ activeBundleProcessors.remove(bundleProcessor.getInstructionId());
+ }
+
/** Shutdown all the cached {@link BundleProcessor}s, running the
tearDown() functions. */
void shutdown() throws Exception {
cachedBundleProcessors.invalidateAll();
@@ -742,6 +749,7 @@ public class ProcessBundleHandler {
}
void reset() throws Exception {
+ this.instructionId = null;
getStartFunctionRegistry().reset();
getFinishFunctionRegistry().reset();
getSplitListener().clear();
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 65947f5..f947cd6 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -22,10 +22,12 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.argThat;
@@ -99,9 +101,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Instant;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
@@ -119,8 +119,6 @@ public class ProcessBundleHandlerTest {
private static final String DATA_INPUT_URN = "beam:runner:source:v1";
private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
- @Rule public ExpectedException thrown = ExpectedException.none();
-
@Mock private BeamFnDataClient beamFnDataClient;
@Captor private ArgumentCaptor<ThrowingConsumer<Exception,
WindowedValue<String>>> consumerCaptor;
@@ -644,6 +642,14 @@ public class ProcessBundleHandlerTest {
// After it is released, ensure the bundle processor is no longer found
cache.release("descriptorId", bundleProcessor);
assertNull(cache.find("known"));
+
+ // Once it is active, ensure the bundle processor is found
+ cache.get("descriptorId", "known", () -> bundleProcessor);
+ assertSame(bundleProcessor, cache.find("known"));
+
+ // After it is discarded, ensure the bundle processor is no longer found
+ cache.discard(bundleProcessor);
+ assertNull(cache.find("known"));
}
@Test
@@ -676,6 +682,7 @@ public class ProcessBundleHandlerTest {
bundleFinalizationCallbacks);
bundleProcessor.reset();
+ assertNull(bundleProcessor.getInstructionId());
verify(startFunctionRegistry, times(1)).reset();
verify(finishFunctionRegistry, times(1)).reset();
verify(splitListener, times(1)).clear();
@@ -727,16 +734,19 @@ public class ProcessBundleHandlerTest {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("TestException");
throw new IllegalStateException("TestException");
}),
new BundleProcessorCache());
- handler.processBundle(
- BeamFnApi.InstructionRequest.newBuilder()
- .setProcessBundle(
-
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
- .build());
+ assertThrows(
+ "TestException",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
}
@Test
@@ -813,7 +823,7 @@ public class ProcessBundleHandlerTest {
}
@Test
- public void testPTransformStartExceptionsArePropagated() throws Exception {
+ public void testPTransformStartExceptionsArePropagated() {
BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
BeamFnApi.ProcessBundleDescriptor.newBuilder()
.putTransforms(
@@ -854,23 +864,24 @@ public class ProcessBundleHandlerTest {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("TestException");
startFunctionRegistry.register(
pTransformId,
ProcessBundleHandlerTest::throwException);
return null;
}),
new BundleProcessorCache());
- handler.processBundle(
- BeamFnApi.InstructionRequest.newBuilder()
- .setProcessBundle(
-
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
- .build());
-
+ assertThrows(
+ "TestException",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
// BundleProcessor is not re-added back to the BundleProcessorCache in
case of an exception
// during bundle processing
- assertThat(
- handler.bundleProcessorCache.getCachedBundleProcessors(),
equalTo(Collections.EMPTY_MAP));
+
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
@Test
@@ -915,23 +926,25 @@ public class ProcessBundleHandlerTest {
addProgressRequestCallback,
splitListener,
bundleFinalizer) -> {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("TestException");
finishFunctionRegistry.register(
pTransformId,
ProcessBundleHandlerTest::throwException);
return null;
}),
new BundleProcessorCache());
- handler.processBundle(
- BeamFnApi.InstructionRequest.newBuilder()
- .setProcessBundle(
-
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
- .build());
+ assertThrows(
+ "TestException",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
// BundleProcessor is not re-added back to the BundleProcessorCache in
case of an exception
// during bundle processing
- assertThat(
- handler.bundleProcessorCache.getCachedBundleProcessors(),
equalTo(Collections.EMPTY_MAP));
+
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
@Test
@@ -1089,19 +1102,22 @@ public class ProcessBundleHandlerTest {
}
private void doStateCalls(BeamFnStateClient
beamFnStateClient) {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("State API calls are unsupported");
beamFnStateClient.handle(
StateRequest.newBuilder().setInstructionId("SUCCESS"),
new CompletableFuture<>());
}
}),
new BundleProcessorCache());
- handler.processBundle(
- BeamFnApi.InstructionRequest.newBuilder()
- .setProcessBundle(
-
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
- .build());
+ assertThrows(
+ "State API calls are unsupported",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
}
@Test
@@ -1155,8 +1171,6 @@ public class ProcessBundleHandlerTest {
}
private void doTimerRegistrations(BeamFnTimerClient
beamFnTimerClient) {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Timers are unsupported");
beamFnTimerClient.register(
LogicalEndpoint.timer("1L", "2L", "Timer"),
Timer.Coder.of(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE),
@@ -1164,11 +1178,16 @@ public class ProcessBundleHandlerTest {
}
}),
new BundleProcessorCache());
- handler.processBundle(
- BeamFnApi.InstructionRequest.newBuilder()
- .setProcessBundle(
-
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
- .build());
+ assertThrows(
+ "Timers are unsupported",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
}
private static void throwException() {