gemini-code-assist[bot] commented on code in PR #38951:
URL: https://github.com/apache/beam/pull/38951#discussion_r3405850465
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -90,11 +98,52 @@ public class CalciteQueryPlanner implements QueryPlanner {
private final Planner planner;
private final JdbcConnection connection;
+ private final FrameworkConfig config;
+
+ // Cannot be final because of wacky initialization logic
+ private RelOptCluster relOptCluster;
+ private CalciteCatalogReader catalogReader;
+ private RelDataTypeFactory typeFactory;
+ private RelOptPlanner calcitePlanner;
Review Comment:

These fields are completely unused in the class. Storing references to them
is also unsafe because they are captured from a temporary planner inside
`Frameworks.withPlanner(...)` which is closed immediately after the block
execution. Using them later would reference closed/disposed Calcite components.
They should be removed entirely.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement,
QueryParameters queryPa
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(),
queryParameters));
}
- LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+ return convertToBeamRel(relNode, queryParameters);
+ } catch (RelConversionException | CannotPlanException e) {
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ } finally {
+ planner.close();
+ }
+ }
+
+ private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+ RelNode newRel = rel.accept(binder);
+ java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+ for (RelNode input : newRel.getInputs()) {
+ newInputs.add(bindParameters(input, binder));
+ }
+ return newRel.copy(newRel.getTraitSet(), newInputs);
+ }
+
+ @Override
+ public RelNode parseToRel(String sqlStatement, QueryParameters
queryParameters)
+ throws ParseException, SqlConversionException {
+ Preconditions.checkArgument(
+ queryParameters.getKind() == Kind.NONE,
+ "Beam SQL Calcite dialect does not yet support query parameters.");
+ try {
+ SqlNode parsed = planner.parse(sqlStatement);
+ TableResolutionUtils.setupCustomTableResolution(connection, parsed);
+ SqlNode validated = planner.validate(parsed);
+ // root of original logical plan
+ RelRoot root = planner.rel(validated);
+ return root.rel;
+ } catch (RelConversionException e) {
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ } finally {
+ planner.close();
+ }
Review Comment:

Calling `planner.close()` in the `finally` block of `parseToRel` is
premature and critical. It closes the Calcite planner immediately after
parsing, which invalidates the returned `RelNode`'s cluster and planner state
before the caller can pass it to `convertToBeamRel`. It also prevents reusing
the `BeamSqlEnv` or planner for any subsequent operations. The `finally` block
should be removed here, leaving the planner open for the subsequent physical
planning stage.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java:
##########
@@ -39,9 +41,29 @@ public interface QueryPlanner {
BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters
queryParameters)
throws ParseException, SqlConversionException;
+ /** It parses and validate the input query, then convert into a {@link
BeamRelNode} tree. */
+ BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters
queryParameters)
Review Comment:

The parameter name `sqlStatement` is misleading because the type is
`RelNode` (which represents a logical plan/relational expression, not a raw SQL
string). It should be renamed to `relNode` or `logicalPlan` to match its type
and purpose.
```suggestion
BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters
queryParameters)
```
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement,
QueryParameters queryPa
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(),
queryParameters));
}
- LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+ return convertToBeamRel(relNode, queryParameters);
Review Comment:

To preserve the query's collation (e.g., for `ORDER BY` clauses) and avoid a
regression, pass `root.collation` to the helper method.
```suggestion
return convertToBeamRel(relNode, queryParameters, root.collation);
```
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -90,11 +98,52 @@ public class CalciteQueryPlanner implements QueryPlanner {
private final Planner planner;
private final JdbcConnection connection;
+ private final FrameworkConfig config;
+
+ // Cannot be final because of wacky initialization logic
+ private RelOptCluster relOptCluster;
+ private CalciteCatalogReader catalogReader;
+ private RelDataTypeFactory typeFactory;
+ private RelOptPlanner calcitePlanner;
/** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */
public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet>
ruleSets) {
this.connection = connection;
- this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
+ this.config = defaultConfig(connection, ruleSets);
+ this.planner = Frameworks.getPlanner(config);
+
+ Frameworks.withPlanner(
+ (cluster, relOptSchema, rootSchema) -> {
+ // CAPTURE THE COMPONENTS HERE
+ this.relOptCluster = cluster;
+ this.catalogReader = (CalciteCatalogReader) relOptSchema;
+ this.typeFactory = cluster.getTypeFactory();
+ this.calcitePlanner = cluster.getPlanner();
+
+ // ... any other setup from the original lambda ...
+ // e.g., planner.setExecutor(executor);
+
+ return null;
+ },
+ config);
+
+ if (this.relOptCluster == null || this.catalogReader == null) {
+ throw new IllegalStateException("Failed to initialize Calcite
components");
+ }
Review Comment:

This `Frameworks.withPlanner` block is used solely to initialize the unused
and unsafe fields (`relOptCluster`, `catalogReader`, etc.). Since those fields
are unused and their underlying planner is closed when this block exits, this
entire block and the subsequent null check should be removed to simplify the
constructor.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement,
QueryParameters queryPa
relNode,
new ParameterBinder(root.rel.getCluster().getRexBuilder(),
queryParameters));
}
- LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+ return convertToBeamRel(relNode, queryParameters);
+ } catch (RelConversionException | CannotPlanException e) {
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ } finally {
+ planner.close();
+ }
+ }
+
+ private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+ RelNode newRel = rel.accept(binder);
+ java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+ for (RelNode input : newRel.getInputs()) {
+ newInputs.add(bindParameters(input, binder));
+ }
+ return newRel.copy(newRel.getTraitSet(), newInputs);
+ }
+
+ @Override
+ public RelNode parseToRel(String sqlStatement, QueryParameters
queryParameters)
+ throws ParseException, SqlConversionException {
+ Preconditions.checkArgument(
+ queryParameters.getKind() == Kind.NONE,
+ "Beam SQL Calcite dialect does not yet support query parameters.");
+ try {
+ SqlNode parsed = planner.parse(sqlStatement);
+ TableResolutionUtils.setupCustomTableResolution(connection, parsed);
+ SqlNode validated = planner.validate(parsed);
+ // root of original logical plan
+ RelRoot root = planner.rel(validated);
+ return root.rel;
+ } catch (RelConversionException e) {
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ throw new ParseException(String.format("Unable to parse query %s",
sqlStatement), e);
+ } finally {
+ planner.close();
+ }
+ }
+
+ @Override
+ public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters
queryParameters) {
+ RelNode beamRelNode;
+ try {
+ LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode));
RelTraitSet desiredTraits =
relNode
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE)
- .replace(root.collation)
+ // .replace(root.collation)
.simplify();
// beam physical plan
relNode
Review Comment:

Commenting out `// .replace(root.collation)` causes a regression because the
query's collation is discarded during physical planning. To fix this and keep
the API clean, introduce a private helper that accepts an optional
`RelCollation` and applies it if present.
```java
@Override
public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters
queryParameters) {
return convertToBeamRel(relNode, queryParameters, null);
}
private BeamRelNode convertToBeamRel(
RelNode relNode,
QueryParameters queryParameters,
@Nullable
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation
collation) {
RelNode beamRelNode;
try {
LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode));
RelTraitSet desiredTraits =
relNode
.getTraitSet()
.replace(BeamLogicalConvention.INSTANCE);
if (collation != null) {
desiredTraits = desiredTraits.replace(collation);
}
desiredTraits = desiredTraits.simplify();
// beam physical plan
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]