y1chi commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738858366
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ @Test
+ public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess()
throws Exception {
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ BeamFnApi.ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ RunnerApi.PTransform.newBuilder()
+
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ String instructionId = invocation.getArgument(0, String.class);
+ CloseableFnDataReceiver<BeamFnApi.Elements> data =
+ invocation.getArgument(2, CloseableFnDataReceiver.class);
+ data.accept(
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(instructionId)
+ .setTransformId("2L")
+ .setIsLast(true))
+ .build());
+ return null;
+ })
+ .when(beamFnDataClient)
+ .registerReceiver(any(), any(), any());
+
+ ProcessBundleHandler handler =
+ new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateGrpcClientCache */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ ImmutableMap.of(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ StringUtf8Coder.of(),
+ (input) -> {});
+ return null;
+ }),
+ new BundleProcessorCache());
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("instructionId")
+ .setProcessBundle(
+
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+ .build());
+
+ // Ensure that we unregister during successful processing
+ verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(),
any());
+ verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testDataProcessingExceptionsArePropagated() throws Exception {
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ BeamFnApi.ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ RunnerApi.PTransform.newBuilder()
+
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ ByteString.Output encodedData = ByteString.newOutput();
+ StringUtf8Coder.of().encode("A", encodedData);
+ String instructionId = invocation.getArgument(0, String.class);
+ CloseableFnDataReceiver<BeamFnApi.Elements> data =
+ invocation.getArgument(2, CloseableFnDataReceiver.class);
+ data.accept(
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(instructionId)
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .setIsLast(true))
+ .build());
+
+ return null;
+ })
+ .when(beamFnDataClient)
+ .registerReceiver(any(), any(), any());
+
+ ProcessBundleHandler handler =
+ new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateGrpcClientCache */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ ImmutableMap.of(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ StringUtf8Coder.of(),
+ (input) -> {
+ throw new IllegalStateException("TestException");
+ });
+ return null;
+ }),
+ new BundleProcessorCache());
+ assertThrows(
+ "TestException",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("instructionId")
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
+
+ // Ensure that we unregister during successful processing
Review comment:
exsure we don't unregister when there's exception?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
##########
@@ -34,28 +33,36 @@
*/
public interface BeamFnDataClient {
/**
- * Registers the following inbound receiver for the provided instruction id
and target.
+ * Registers the following inbound receiver for the provided instruction id.
*
* <p>The provided coder is used to decode inbound elements. The decoded
elements are passed to
* the provided receiver. Any failure during decoding or processing of the
element will complete
* the returned future exceptionally. On successful termination of the
stream, the returned future
* is completed successfully.
*
* <p>The receiver is not required to be thread safe.
+ *
+ * <p>Receivers for successfully processed bundles must be unregistered. See
{@link
+ * #unregisterReceiver} for details.
Review comment:
Do we need to expose both register and unregister function, why not have
receive function handles register, wait for completion or exception, unregister
internally?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]