gemini-code-assist[bot] commented on code in PR #37031:
URL: https://github.com/apache/beam/pull/37031#discussion_r2594801545
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java:
##########
@@ -1929,4 +1930,154 @@ public void
testTimerRegistrationsFailIfNoTimerApiServiceDescriptorSpecified() t
private static void throwException() {
throw new IllegalStateException("TestException");
}
+
+ public void testTopologicalOrderRespectsDependency() throws Exception {
Review Comment:

This test method is missing the `@Test` annotation, so it will not be
executed by the test runner. Please add the annotation to ensure this test case
is run.
```suggestion
@Test
public void testTopologicalOrderRespectsDependency() throws Exception {
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -843,68 +882,126 @@ public void afterBundleCommit(Instant callbackExpiry,
Callback callback) {
bundleFinalizationCallbackRegistrations,
runnerCapabilities);
- // Create a BeamFnStateClient
- for (Map.Entry<String, PTransform> entry :
bundleDescriptor.getTransformsMap().entrySet()) {
+ // Build components once for this descriptor.
+ final RunnerApi.Components components =
+ RunnerApi.Components.newBuilder()
+ .putAllCoders(bundleDescriptor.getCodersMap())
+ .putAllPcollections(bundleDescriptor.getPcollectionsMap())
+
.putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap())
+ .build();
- // Skip anything which isn't a root.
- // Also force data output transforms to be unconditionally instantiated
(see BEAM-10450).
- // TODO: Remove source as a root and have it be triggered by the Runner.
- if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
- && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn())
- && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())
- && !PTransformTranslation.READ_TRANSFORM_URN.equals(
- entry.getValue().getSpec().getUrn())) {
- continue;
+ // Use cached topological order when available. Fall back to descriptor
order on error.
+ try {
+ ImmutableList<String> topo = topologicalOrderCache.get(bundleId);
+ for (String transformId : topo) {
+ PTransform pTransform =
bundleDescriptor.getTransformsMap().get(transformId);
+ if (pTransform == null) {
+ continue; // defensive
+ }
+ if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn())
+ && !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn())
+ && !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn())
+ &&
!PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn()))
{
+ continue;
+ }
+ addRunnerAndConsumersForPTransformRecursively(
+ beamFnStateClient,
+ beamFnDataClient,
+ transformId,
+ pTransform,
+ bundleProcessor::getInstructionId,
+ bundleProcessor::getCacheTokens,
+ bundleProcessor::getBundleCache,
+ bundleDescriptor,
+ components,
+ pCollectionIdsToConsumingPTransforms,
+ pCollectionConsumerRegistry,
+ processedPTransformIds,
+ startFunctionRegistry,
+ finishFunctionRegistry,
+ resetFunctions::add,
+ tearDownFunctions::add,
+ (apiServiceDescriptor, dataEndpoint) -> {
+ if (!bundleProcessor
+ .getInboundEndpointApiServiceDescriptors()
+ .contains(apiServiceDescriptor)) {
+
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+ }
+ bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+ },
+ (timerEndpoint) -> {
+ if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+ throw new IllegalStateException(
+ String.format(
+ "Timers are unsupported because the "
+ + "ProcessBundleRequest %s does not provide a
timer ApiServiceDescriptor.",
+ bundleId));
+ }
+ bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+ },
+ bundleProgressReporterAndRegistrar::register,
+ splitListener,
+ bundleFinalizer,
+ bundleProcessor.getChannelRoots(),
+ bundleProcessor.getOutboundAggregators(),
+ bundleProcessor.getRunnerCapabilities());
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Topological ordering failed for descriptor {}. Falling back to
descriptor order. Cause: {}",
+ bundleId,
+ e.toString());
Review Comment:

The exception is logged via `e.toString()`, which hides the stack trace.
This can make debugging difficult. It's better to pass the exception object
directly to the logger to include the full stack trace.
```java
LOG.warn(
"Topological ordering failed for descriptor {}. Falling back to
descriptor order.",
bundleId,
e);
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -843,68 +882,126 @@ public void afterBundleCommit(Instant callbackExpiry,
Callback callback) {
bundleFinalizationCallbackRegistrations,
runnerCapabilities);
- // Create a BeamFnStateClient
- for (Map.Entry<String, PTransform> entry :
bundleDescriptor.getTransformsMap().entrySet()) {
+ // Build components once for this descriptor.
+ final RunnerApi.Components components =
+ RunnerApi.Components.newBuilder()
+ .putAllCoders(bundleDescriptor.getCodersMap())
+ .putAllPcollections(bundleDescriptor.getPcollectionsMap())
+
.putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap())
+ .build();
- // Skip anything which isn't a root.
- // Also force data output transforms to be unconditionally instantiated
(see BEAM-10450).
- // TODO: Remove source as a root and have it be triggered by the Runner.
- if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
- && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn())
- && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())
- && !PTransformTranslation.READ_TRANSFORM_URN.equals(
- entry.getValue().getSpec().getUrn())) {
- continue;
+ // Use cached topological order when available. Fall back to descriptor
order on error.
+ try {
+ ImmutableList<String> topo = topologicalOrderCache.get(bundleId);
+ for (String transformId : topo) {
+ PTransform pTransform =
bundleDescriptor.getTransformsMap().get(transformId);
+ if (pTransform == null) {
+ continue; // defensive
+ }
+ if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn())
+ && !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn())
+ && !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn())
+ &&
!PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn()))
{
+ continue;
+ }
+ addRunnerAndConsumersForPTransformRecursively(
+ beamFnStateClient,
+ beamFnDataClient,
+ transformId,
+ pTransform,
+ bundleProcessor::getInstructionId,
+ bundleProcessor::getCacheTokens,
+ bundleProcessor::getBundleCache,
+ bundleDescriptor,
+ components,
+ pCollectionIdsToConsumingPTransforms,
+ pCollectionConsumerRegistry,
+ processedPTransformIds,
+ startFunctionRegistry,
+ finishFunctionRegistry,
+ resetFunctions::add,
+ tearDownFunctions::add,
+ (apiServiceDescriptor, dataEndpoint) -> {
+ if (!bundleProcessor
+ .getInboundEndpointApiServiceDescriptors()
+ .contains(apiServiceDescriptor)) {
+
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+ }
+ bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+ },
+ (timerEndpoint) -> {
+ if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+ throw new IllegalStateException(
+ String.format(
+ "Timers are unsupported because the "
+ + "ProcessBundleRequest %s does not provide a
timer ApiServiceDescriptor.",
+ bundleId));
+ }
+ bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+ },
+ bundleProgressReporterAndRegistrar::register,
+ splitListener,
+ bundleFinalizer,
+ bundleProcessor.getChannelRoots(),
+ bundleProcessor.getOutboundAggregators(),
+ bundleProcessor.getRunnerCapabilities());
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Topological ordering failed for descriptor {}. Falling back to
descriptor order. Cause: {}",
+ bundleId,
+ e.toString());
+ // Fallback: previous descriptor-order iteration.
+ for (Map.Entry<String, PTransform> entry :
bundleDescriptor.getTransformsMap().entrySet()) {
+ if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
+ && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn())
+ && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())
+ && !PTransformTranslation.READ_TRANSFORM_URN.equals(
+ entry.getValue().getSpec().getUrn())) {
+ continue;
+ }
+ addRunnerAndConsumersForPTransformRecursively(
+ beamFnStateClient,
+ beamFnDataClient,
+ entry.getKey(),
+ entry.getValue(),
+ bundleProcessor::getInstructionId,
+ bundleProcessor::getCacheTokens,
+ bundleProcessor::getBundleCache,
+ bundleDescriptor,
+ components,
+ pCollectionIdsToConsumingPTransforms,
+ pCollectionConsumerRegistry,
+ processedPTransformIds,
+ startFunctionRegistry,
+ finishFunctionRegistry,
+ resetFunctions::add,
+ tearDownFunctions::add,
+ (apiServiceDescriptor, dataEndpoint) -> {
+ if (!bundleProcessor
+ .getInboundEndpointApiServiceDescriptors()
+ .contains(apiServiceDescriptor)) {
+
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
+ }
+ bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
+ },
+ (timerEndpoint) -> {
+ if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
+ throw new IllegalStateException(
+ String.format(
+ "Timers are unsupported because the "
+ + "ProcessBundleRequest %s does not provide a
timer ApiServiceDescriptor.",
+ bundleId));
+ }
+ bundleProcessor.getTimerEndpoints().add(timerEndpoint);
+ },
+ bundleProgressReporterAndRegistrar::register,
+ splitListener,
+ bundleFinalizer,
+ bundleProcessor.getChannelRoots(),
+ bundleProcessor.getOutboundAggregators(),
+ bundleProcessor.getRunnerCapabilities());
}
-
- RunnerApi.Components components =
- RunnerApi.Components.newBuilder()
- .putAllCoders(bundleDescriptor.getCodersMap())
- .putAllPcollections(bundleDescriptor.getPcollectionsMap())
-
.putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap())
- .build();
-
- addRunnerAndConsumersForPTransformRecursively(
- beamFnStateClient,
- beamFnDataClient,
- entry.getKey(),
- entry.getValue(),
- bundleProcessor::getInstructionId,
- bundleProcessor::getCacheTokens,
- bundleProcessor::getBundleCache,
- bundleDescriptor,
- components,
- pCollectionIdsToConsumingPTransforms,
- pCollectionConsumerRegistry,
- processedPTransformIds,
- startFunctionRegistry,
- finishFunctionRegistry,
- resetFunctions::add,
- tearDownFunctions::add,
- (apiServiceDescriptor, dataEndpoint) -> {
- if (!bundleProcessor
- .getInboundEndpointApiServiceDescriptors()
- .contains(apiServiceDescriptor)) {
-
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
- }
- bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
- },
- (timerEndpoint) -> {
- if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
- throw new IllegalStateException(
- String.format(
- "Timers are unsupported because the "
- + "ProcessBundleRequest %s does not provide a timer
ApiServiceDescriptor.",
- bundleId));
- }
- bundleProcessor.getTimerEndpoints().add(timerEndpoint);
- },
- bundleProgressReporterAndRegistrar::register,
- splitListener,
- bundleFinalizer,
- bundleProcessor.getChannelRoots(),
- bundleProcessor.getOutboundAggregators(),
- bundleProcessor.getRunnerCapabilities());
}
Review Comment:

There is significant code duplication between the `try` and `catch` blocks.
This can be refactored to improve maintainability by first determining the list
of transform IDs to process (either from the topological sort or from the
descriptor map on failure), and then iterating over them in a single loop.
```java
// Use cached topological order when available. Fall back to descriptor
order on error.
Iterable<String> transformIds;
try {
transformIds = topologicalOrderCache.get(bundleId);
} catch (Exception e) {
LOG.warn(
"Topological ordering failed for descriptor {}. Falling back to
descriptor order. Cause: {}",
bundleId,
e.toString());
// Fallback: previous descriptor-order iteration.
transformIds = bundleDescriptor.getTransformsMap().keySet();
}
for (String transformId : transformIds) {
PTransform pTransform =
bundleDescriptor.getTransformsMap().get(transformId);
if (pTransform == null) {
continue; // defensive
}
if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn())
&& !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn())
&& !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn())
&&
!PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn()))
{
continue;
}
addRunnerAndConsumersForPTransformRecursively(
beamFnStateClient,
beamFnDataClient,
transformId,
pTransform,
bundleProcessor::getInstructionId,
bundleProcessor::getCacheTokens,
bundleProcessor::getBundleCache,
bundleDescriptor,
components,
pCollectionIdsToConsumingPTransforms,
pCollectionConsumerRegistry,
processedPTransformIds,
startFunctionRegistry,
finishFunctionRegistry,
resetFunctions::add,
tearDownFunctions::add,
(apiServiceDescriptor, dataEndpoint) -> {
if (!bundleProcessor
.getInboundEndpointApiServiceDescriptors()
.contains(apiServiceDescriptor)) {
bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor);
}
bundleProcessor.getInboundDataEndpoints().add(dataEndpoint);
},
(timerEndpoint) -> {
if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
throw new IllegalStateException(
String.format(
"Timers are unsupported because the "
+ "ProcessBundleRequest %s does not provide a
timer ApiServiceDescriptor.",
bundleId));
}
bundleProcessor.getTimerEndpoints().add(timerEndpoint);
},
bundleProgressReporterAndRegistrar::register,
splitListener,
bundleFinalizer,
bundleProcessor.getChannelRoots(),
bundleProcessor.getOutboundAggregators(),
bundleProcessor.getRunnerCapabilities());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]