lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738921411



##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##########
@@ -776,6 +790,142 @@ public void testPTransformStartExceptionsArePropagated() {
     
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), 
empty());
   }
 
+  @Test
+  public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() 
throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setIsLast(true))
+                      .build());
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {});
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("instructionId")
+            .setProcessBundle(
+                
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+            .build());
+
+    // Ensure that we unregister during successful processing
+    verify(beamFnDataClient).registerReceiver(eq("instructionId"), any(), 
any());
+    verify(beamFnDataClient).unregisterReceiver(eq("instructionId"), any());
+    verifyNoMoreInteractions(beamFnDataClient);
+  }
+
+  @Test
+  public void testDataProcessingExceptionsArePropagated() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms(
+                "2L",
+                RunnerApi.PTransform.newBuilder()
+                    
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                    .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
+
+    Mockito.doAnswer(
+            (invocation) -> {
+              ByteString.Output encodedData = ByteString.newOutput();
+              StringUtf8Coder.of().encode("A", encodedData);
+              String instructionId = invocation.getArgument(0, String.class);
+              CloseableFnDataReceiver<BeamFnApi.Elements> data =
+                  invocation.getArgument(2, CloseableFnDataReceiver.class);
+              data.accept(
+                  BeamFnApi.Elements.newBuilder()
+                      .addData(
+                          BeamFnApi.Elements.Data.newBuilder()
+                              .setInstructionId(instructionId)
+                              .setTransformId("2L")
+                              .setData(encodedData.toByteString())
+                              .setIsLast(true))
+                      .build());
+
+              return null;
+            })
+        .when(beamFnDataClient)
+        .registerReceiver(any(), any(), any());
+
+    ProcessBundleHandler handler =
+        new ProcessBundleHandler(
+            PipelineOptionsFactory.create(),
+            Collections.emptySet(),
+            fnApiRegistry::get,
+            beamFnDataClient,
+            null /* beamFnStateGrpcClientCache */,
+            null /* finalizeBundleHandler */,
+            new ShortIdMap(),
+            ImmutableMap.of(
+                DATA_INPUT_URN,
+                (PTransformRunnerFactory<Object>)
+                    (context) -> {
+                      context.addIncomingDataEndpoint(
+                          ApiServiceDescriptor.getDefaultInstance(),
+                          StringUtf8Coder.of(),
+                          (input) -> {
+                            throw new IllegalStateException("TestException");
+                          });
+                      return null;
+                    }),
+            new BundleProcessorCache());
+    assertThrows(
+        "TestException",
+        IllegalStateException.class,
+        () ->
+            handler.processBundle(
+                BeamFnApi.InstructionRequest.newBuilder()
+                    .setInstructionId("instructionId")
+                    .setProcessBundle(
+                        BeamFnApi.ProcessBundleRequest.newBuilder()
+                            .setProcessBundleDescriptorId("1L"))
+                    .build()));
+
+    // Ensure that we unregister during successful processing

Review comment:
       We don't want to unregister during bundle processing failure because 
that will remove the consumer for that instruction id which would cause 
stuckness since the runner could still be sending us data (for example it could 
be in a network buffer).
   
   We rely on the `BeamFnDataInboundObserver2#awaitCompletion` to close its 
internal queue if an exception during processing in the bundle process thread 
is detected (before being rethrown and causing bundle processing to fail) which 
will make the failure visible to the gRPC read thread if it ever enqueues 
anything. If that becomes visible to the gRPC read thread it can mark the 
instruction id as bad. Similarly if the gRPC read thread fails for some reason 
or is closed, the queue is closed allowing bundle processing threads to stop 
waiting and fail with an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to