gemini-code-assist[bot] commented on code in PR #37031:
URL: https://github.com/apache/beam/pull/37031#discussion_r2596468510


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -843,32 +883,43 @@ public void afterBundleCommit(Instant callbackExpiry, 
Callback callback) {
             bundleFinalizationCallbackRegistrations,
             runnerCapabilities);
 
-    // Create a BeamFnStateClient
-    for (Map.Entry<String, PTransform> entry : 
bundleDescriptor.getTransformsMap().entrySet()) {
+    // Build components once for this descriptor.
+    final RunnerApi.Components components =
+        RunnerApi.Components.newBuilder()
+            .putAllCoders(bundleDescriptor.getCodersMap())
+            .putAllPcollections(bundleDescriptor.getPcollectionsMap())
+            
.putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap())
+            .build();
 
-      // Skip anything which isn't a root.
-      // Also force data output transforms to be unconditionally instantiated 
(see BEAM-10450).
-      // TODO: Remove source as a root and have it be triggered by the Runner.
-      if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
-          && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn())
-          && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())
-          && !PTransformTranslation.READ_TRANSFORM_URN.equals(
-              entry.getValue().getSpec().getUrn())) {
-        continue;
-      }
+    Iterable<String> transformIds;
+    // Use cached topological order when available. Fall back to descriptor 
order on error.
+    try {
+      transformIds = topologicalOrderCache.get(bundleId);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The current implementation results in `fnApiRegistry.apply(bundleId)` being 
called twice in the success path: once at the beginning of 
`createBundleProcessor` (line 813) and once inside the 
`topologicalOrderCache`'s loader. This is redundant.
   
   To avoid this, you could change the `topologicalOrderCache` to store not 
just the list of transform IDs, but also the `ProcessBundleDescriptor` itself. 
For example, it could cache a wrapper object like 
`TopoOrderResult(ProcessBundleDescriptor, ImmutableList<String>)`.
   
   Then, `createBundleProcessor` would get this object from the cache. On a 
cache hit, it would have both the descriptor and the topological order without 
a second fetch. On a cache miss, the loader would fetch the descriptor once. On 
a loader failure, you would fall back to fetching the descriptor and using 
descriptor order, as you do now.
   
   This would make the `fnApiRegistry` call happen only once per descriptor, 
either in the cache loader or in the fallback path. The test 
`testProcessBundleCreatesRunnersForAllTransformsUsingTopologicalCache` would 
then need to be updated to assert one call instead of two.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -220,6 +223,43 @@ public ProcessBundleHandler(
     this.processWideCache = processWideCache;
     this.bundleProcessorCache = bundleProcessorCache;
     this.dataSampler = dataSampler;
+
+    // Initialize topological-order cache. Use same timeout idiom as 
BundleProcessorCache.
+    CacheBuilder<Object, Object> topoBuilder = CacheBuilder.newBuilder();
+    Duration topoTimeout = 
options.as(SdkHarnessOptions.class).getBundleProcessorCacheTimeout();
+    if (topoTimeout.compareTo(Duration.ZERO) > 0) {
+      topoBuilder = topoBuilder.expireAfterAccess(topoTimeout);
+    }
+    this.topologicalOrderCache =
+        topoBuilder.build(
+            new CacheLoader<String, ImmutableList<String>>() {
+              @Override
+              public ImmutableList<String> load(String descriptorId) throws 
Exception {
+                ProcessBundleDescriptor desc = 
fnApiRegistry.apply(descriptorId);
+                RunnerApi.Components comps =
+                    RunnerApi.Components.newBuilder()
+                        .putAllCoders(desc.getCodersMap())
+                        .putAllPcollections(desc.getPcollectionsMap())
+                        .putAllTransforms(desc.getTransformsMap())
+                        
.putAllWindowingStrategies(desc.getWindowingStrategiesMap())
+                        .build();
+                QueryablePipeline qp =
+                    
QueryablePipeline.forTransforms(desc.getTransformsMap().keySet(), comps);
+                ImmutableList.Builder<String> ids = ImmutableList.builder();
+                for (PipelineNode.PTransformNode node : 
qp.getTopologicallyOrderedTransforms()) {
+                  ids.add(node.getId());
+                }
+                ImmutableList<String> topo = ids.build();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This loop can be made more concise by using Java Streams to collect the 
transform IDs.
   
   ```suggestion
                   ImmutableList<String> topo = 
qp.getTopologicallyOrderedTransforms().stream().map(PipelineNode.PTransformNode::getId).collect(ImmutableList.toImmutableList());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to