gemini-code-assist[bot] commented on code in PR #38951:
URL: https://github.com/apache/beam/pull/38951#discussion_r3431483242
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -224,28 +368,36 @@ public BeamRelNode convertToBeamRel(String sqlStatement,
QueryParameters queryPa
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
relNode.getCluster().invalidateMetadataQuery();
- beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
- LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
- } catch (RelConversionException | CannotPlanException e) {
+
+ if (config.getPrograms().isEmpty()) {
+ throw new SqlConversionException("No planning programs configured in
FrameworkConfig.");
+ }
+ Program program = config.getPrograms().get(0);
+ RelNode optimizedNode =
+ program.run(
+ relNode.getCluster().getPlanner(),
+ relNode,
+ desiredTraits,
+ ImmutableList.of(),
+ ImmutableList.of());
+
+ if (!(optimizedNode instanceof BeamRelNode)) {
+ throw new SqlConversionException(
+ String.format(
+ "The optimizer was unable to produce a Beam physical plan. "
+ + "Expected BeamRelNode, but got: %s",
+ optimizedNode.getClass().getName()));
+ }
+ beamRelNode = (BeamRelNode) optimizedNode;
+ } catch (CannotPlanException e) {
throw new SqlConversionException(
- String.format("Unable to convert query %s", sqlStatement), e);
- } catch (SqlParseException | ValidationException e) {
- throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ String.format("Unable to convert relNode to Beam: %s", relNode), e);
} finally {
planner.close();
}
Review Comment:

The `RelMetadataQuery.THREAD_PROVIDERS` is a `ThreadLocal` variable that is
set at the beginning of `convertToBeamRel` but never cleared. In long-running
environments or thread pools, failing to clear `ThreadLocal` variables can lead
to classloader memory leaks because the metadata provider holds references to
the `RelOptCluster` and other heavy objects. We should remove the thread-local
value in the `finally` block.
```suggestion
} finally {
RelMetadataQuery.THREAD_PROVIDERS.remove();
planner.close();
}
```
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,13 +268,92 @@ public BeamRelNode convertToBeamRel(String sqlStatement,
QueryParameters queryPa
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(),
queryParameters));
}
- LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
- RelTraitSet desiredTraits =
- relNode
- .getTraitSet()
- .replace(BeamLogicalConvention.INSTANCE)
- .replace(root.collation)
- .simplify();
+ return convertToBeamRel(relNode, root.collation);
+ } catch (RelConversionException | CannotPlanException e) {
+ planner.close();
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ planner.close();
+ throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ }
+ }
+
+ private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+ RelNode newRel = rel.accept(binder);
+ java.util.List<RelNode> inputs = newRel.getInputs();
+ java.util.List<RelNode> newInputs = new
java.util.ArrayList<>(inputs.size());
+ boolean changed = newRel != rel;
+ for (RelNode input : inputs) {
+ RelNode newInput = bindParameters(input, binder);
+ newInputs.add(newInput);
+ if (newInput != input) {
+ changed = true;
+ }
+ }
+ return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
+ }
Review Comment:

In `bindParameters`, calling `newRel.copy(...)` when only the row
expressions of `newRel` changed (but none of its child inputs changed) is
redundant because `newRel` already contains the updated expressions and the
original inputs. We should only perform a `copy` if the child inputs actually
changed (`inputsChanged` is true). This avoids unnecessary node copies during
parameter binding.
```suggestion
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new
java.util.ArrayList<>(inputs.size());
boolean inputsChanged = false;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
inputsChanged = true;
}
}
return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) :
newRel;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]