lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738921411
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"),
empty());
}
+ @Test
+ public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess()
throws Exception {
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ BeamFnApi.ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ RunnerApi.PTransform.newBuilder()
+
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ String instructionId = invocation.getArgument(0, String.class);
+ CloseableFnDataReceiver<BeamFnApi.Elements> data =
+ invocation.getArgument(2, CloseableFnDataReceiver.class);
+ data.accept(
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(instructionId)
+ .setTransformId("2L")
+ .setIsLast(true))
+ .build());
+ return null;
+ })
+ .when(beamFnDataClient)
+ .registerReceiver(any(), any(), any());
+
+ ProcessBundleHandler handler =
+ new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateGrpcClientCache */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ ImmutableMap.of(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ StringUtf8Coder.of(),
+ (input) -> {});
+ return null;
+ }),
+ new BundleProcessorCache());
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("instructionId")
+ .setProcessBundle(
+
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+ .build());
+
+ // Ensure that we unregister during successful processing
+ verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(),
any());
+ verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+ verifyNoMoreInteractions(beamFnDataClient);
+ }
+
+ @Test
+ public void testDataProcessingExceptionsArePropagated() throws Exception {
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+ BeamFnApi.ProcessBundleDescriptor.newBuilder()
+ .putTransforms(
+ "2L",
+ RunnerApi.PTransform.newBuilder()
+
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+ .build())
+ .build();
+ Map<String, Message> fnApiRegistry = ImmutableMap.of("1L",
processBundleDescriptor);
+
+ Mockito.doAnswer(
+ (invocation) -> {
+ ByteString.Output encodedData = ByteString.newOutput();
+ StringUtf8Coder.of().encode("A", encodedData);
+ String instructionId = invocation.getArgument(0, String.class);
+ CloseableFnDataReceiver<BeamFnApi.Elements> data =
+ invocation.getArgument(2, CloseableFnDataReceiver.class);
+ data.accept(
+ BeamFnApi.Elements.newBuilder()
+ .addData(
+ BeamFnApi.Elements.Data.newBuilder()
+ .setInstructionId(instructionId)
+ .setTransformId("2L")
+ .setData(encodedData.toByteString())
+ .setIsLast(true))
+ .build());
+
+ return null;
+ })
+ .when(beamFnDataClient)
+ .registerReceiver(any(), any(), any());
+
+ ProcessBundleHandler handler =
+ new ProcessBundleHandler(
+ PipelineOptionsFactory.create(),
+ Collections.emptySet(),
+ fnApiRegistry::get,
+ beamFnDataClient,
+ null /* beamFnStateGrpcClientCache */,
+ null /* finalizeBundleHandler */,
+ new ShortIdMap(),
+ ImmutableMap.of(
+ DATA_INPUT_URN,
+ (PTransformRunnerFactory<Object>)
+ (context) -> {
+ context.addIncomingDataEndpoint(
+ ApiServiceDescriptor.getDefaultInstance(),
+ StringUtf8Coder.of(),
+ (input) -> {
+ throw new IllegalStateException("TestException");
+ });
+ return null;
+ }),
+ new BundleProcessorCache());
+ assertThrows(
+ "TestException",
+ IllegalStateException.class,
+ () ->
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("instructionId")
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorId("1L"))
+ .build()));
+
+ // Ensure that we unregister during successful processing
Review comment:
We don't want to unregister during bundle processing failure because
that will remove the consumer for that instruction id which would cause
stuckness since the runner could still be sending us data (for example it could
be in a network buffer).
We rely on the `BeamFnDataInboundObserver2#awaitCompletion` to close its
internal queue if an exception during processing in the bundle process thread
is detected (before being rethrown and causing bundle processing to fail) which
will make the failure visible to the gRPC read thread if it ever enqueues
anything. If that becomes visible to the gRPC read thread it can mark the
instruction id as bad. Similarly if the gRPC read thread fails for some reason
or is closed, the queue is closed allowing bundle processing threads to stop
waiting and fail with an exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]