gemini-code-assist[bot] commented on code in PR #38951:
URL: https://github.com/apache/beam/pull/38951#discussion_r3431483242


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -224,28 +368,36 @@ public BeamRelNode convertToBeamRel(String sqlStatement, 
QueryParameters queryPa
       RelMetadataQuery.THREAD_PROVIDERS.set(
           
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
       relNode.getCluster().invalidateMetadataQuery();
-      beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
-      LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
-    } catch (RelConversionException | CannotPlanException e) {
+
+      if (config.getPrograms().isEmpty()) {
+        throw new SqlConversionException("No planning programs configured in 
FrameworkConfig.");
+      }
+      Program program = config.getPrograms().get(0);
+      RelNode optimizedNode =
+          program.run(
+              relNode.getCluster().getPlanner(),
+              relNode,
+              desiredTraits,
+              ImmutableList.of(),
+              ImmutableList.of());
+
+      if (!(optimizedNode instanceof BeamRelNode)) {
+        throw new SqlConversionException(
+            String.format(
+                "The optimizer was unable to produce a Beam physical plan. "
+                    + "Expected BeamRelNode, but got: %s",
+                optimizedNode.getClass().getName()));
+      }
+      beamRelNode = (BeamRelNode) optimizedNode;
+    } catch (CannotPlanException e) {
       throw new SqlConversionException(
-          String.format("Unable to convert query %s", sqlStatement), e);
-    } catch (SqlParseException | ValidationException e) {
-      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+          String.format("Unable to convert relNode to Beam: %s", relNode), e);
     } finally {
       planner.close();
     }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `RelMetadataQuery.THREAD_PROVIDERS` is a `ThreadLocal` variable that is 
set at the beginning of `convertToBeamRel` but never cleared. In long-running 
environments or thread pools, failing to clear `ThreadLocal` variables can lead 
to classloader memory leaks because the metadata provider holds references to 
the `RelOptCluster` and other heavy objects. We should remove the thread-local 
value in the `finally` block.
   
   ```suggestion
       } finally {
         RelMetadataQuery.THREAD_PROVIDERS.remove();
         planner.close();
       }
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,13 +268,92 @@ public BeamRelNode convertToBeamRel(String sqlStatement, 
QueryParameters queryPa
                 relNode,
                 new ParameterBinder(root.rel.getCluster().getRexBuilder(), 
queryParameters));
       }
-      LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
-      RelTraitSet desiredTraits =
-          relNode
-              .getTraitSet()
-              .replace(BeamLogicalConvention.INSTANCE)
-              .replace(root.collation)
-              .simplify();
+      return convertToBeamRel(relNode, root.collation);
+    } catch (RelConversionException | CannotPlanException e) {
+      planner.close();
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      planner.close();
+      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+    }
+  }
+
+  private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+    RelNode newRel = rel.accept(binder);
+    java.util.List<RelNode> inputs = newRel.getInputs();
+    java.util.List<RelNode> newInputs = new 
java.util.ArrayList<>(inputs.size());
+    boolean changed = newRel != rel;
+    for (RelNode input : inputs) {
+      RelNode newInput = bindParameters(input, binder);
+      newInputs.add(newInput);
+      if (newInput != input) {
+        changed = true;
+      }
+    }
+    return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   In `bindParameters`, calling `newRel.copy(...)` when only the row 
expressions of `newRel` changed (but none of its child inputs changed) is 
redundant because `newRel` already contains the updated expressions and the 
original inputs. We should only perform a `copy` if the child inputs actually 
changed (`inputsChanged` is true). This avoids unnecessary node copies during 
parameter binding.
   
   ```suggestion
     private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
       RelNode newRel = rel.accept(binder);
       java.util.List<RelNode> inputs = newRel.getInputs();
       java.util.List<RelNode> newInputs = new 
java.util.ArrayList<>(inputs.size());
       boolean inputsChanged = false;
       for (RelNode input : inputs) {
         RelNode newInput = bindParameters(input, binder);
         newInputs.add(newInput);
         if (newInput != input) {
           inputsChanged = true;
         }
       }
       return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : 
newRel;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to