lukecwik commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738918901



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, 
Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       I didn't want to take on a mostly unrelated refactoring of a big chunk 
of the ProcessBundleHandler within this PR as I had already done one 
refactoring to migrate the PTransformRunnerFactory to use a Context object 
instead of passing in all the arguments.
   
   The BundleProcessor::getInstructionId is passed in to the 
PTransformRunnerFactory context so it needs to exist beforehand but the 
BeamFnDataInboundObserver2 can only be constructed once all the endpoints have 
been registered which is why a lot of the lists and internal objects have 
certain mutable aspects which aren't really needed to be mutable after all the 
PTransform runners are created.
   
   Overall the internal workings of the ProcessBundleHandler code related to 
creating the associated PTransforms and  BundleProcessor likely need to move to 
a builder pattern where we have one object responsible for creating the 
transforms and a different object responsible for executing bundles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to