TheNeuralBit commented on a change in pull request #15807:
URL: https://github.com/apache/beam/pull/15807#discussion_r738850651



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -685,11 +715,27 @@ public void afterBundleCommit(Instant callbackExpiry, 
Callback callback) {
           finishFunctionRegistry,
           resetFunctions::add,
           tearDownFunctions::add,
+          (apiServiceDescriptor, dataEndpoint) -> {
+            if (!bundleProcessor
+                .getInboundEndpointApiServiceDescriptors()
+                .contains(apiServiceDescriptor)) {
+              
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+            }
+            bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+          },
+          (timerEndpoint) -> {
+            if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+              throw FailAllTimerRegistrations.fail(processBundleRequest);
+            }
+            bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+          },
           progressRequestCallbacks::add,
           splitListener,
           bundleFinalizer,
           bundleProcessor.getChannelRoots());
     }
+    bundleProcessor.finish();

Review comment:
       This finish logic feels error-prone. Couldn't you create all the members 
first, and then construct the `BundleProcessor`, fully realized?

##########
File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
##########
@@ -49,7 +49,10 @@
  *
  * <p>TODO: Add support for multiplexing over multiple outbound observers by 
stickying the output
  * location with a specific outbound observer.
+ *
+ * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}.

Review comment:
       Are there other usages of these classes that prevent us from going ahead 
and removing them?

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java
##########
@@ -210,6 +222,34 @@ public void accept(T input) throws Exception {
     };
   }
 
+  public abstract Map<ApiServiceDescriptor, List<DataEndpoint<?>>> 
getIncomingDataEndpoints();
+
+  @Override
+  public <T> void addIncomingDataEndpoint(
+      ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, 
FnDataReceiver<T> receiver) {
+    getIncomingDataEndpoints()
+        .computeIfAbsent(apiServiceDescriptor, (unused) -> new ArrayList<>())
+        .add(DataEndpoint.create(getPTransformId(), coder, receiver));
+  }
+
+  public abstract List<TimerEndpoint<?>> getIncomingTimerEndpoints();
+
+  public <T> TimerEndpoint<T> getIncomingTimerEndpoint(String timerFamilyId) {
+    for (TimerEndpoint<?> timerEndpoint : getIncomingTimerEndpoints()) {
+      if (timerFamilyId.equals(timerEndpoint.getTimerFamilyId())) {
+        return (TimerEndpoint<T>) timerEndpoint;
+      }
+    }

Review comment:
       I guess this is just a test class, but why not store the timer endpoints 
in a map rather than iterating over them?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -858,7 +909,11 @@ public static BundleProcessor create(
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
-    abstract QueueingBeamFnDataClient getQueueingClient();
+    abstract List<Endpoints.ApiServiceDescriptor> 
getInboundEndpointApiServiceDescriptors();
+
+    abstract List<DataEndpoint<?>> getInboundDataEndpoints();
+
+    abstract List<TimerEndpoint<?>> getTimerEndpoints();

Review comment:
       Do these need to be mutable lists (i.e. will they be modified after 
creation), or are they just mutable because of the setup and `finish()` logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to