gemini-code-assist[bot] commented on code in PR #38832:
URL: https://github.com/apache/beam/pull/38832#discussion_r3380409874


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java:
##########
@@ -209,6 +207,21 @@ public PCollection<Row> expand(PCollectionList<Row> 
pinput) {
                   GlobalWindows.class.getSimpleName(), windowingStrategy));
         }
 
+        // When no limit is specified (count == -1), we must sort the entire 
dataset.
+        // To achieve this globally, we key all rows by a single dummy key, 
group them together
+        // using GroupByKey to ensure they are processed together, and then 
sort them in-memory
+        // via SortInMemoryFn. Note: This can be memory-intensive for large 
datasets. It should
+        // only be done as a final step when the remaining data is small
+        if (count == -1) {
+          BeamSqlRowComparator comparator =
+              new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
+          return upstream
+              .apply("WithDummyKey", WithKeys.of("DummyKey"))
+              .apply("GroupByKey", GroupByKey.create())
+              .apply("SortInMemory", ParDo.of(new SortInMemoryFn(comparator)))
+              .setRowSchema(CalciteUtils.toSchema(getRowType()));
+        }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Sorting an unbounded `PCollection` globally without a limit is not supported 
and will lead to an infinite accumulation of state or out-of-memory errors at 
runtime. We should explicitly throw an `UnsupportedOperationException` during 
pipeline construction if the input is unbounded.
   
   ```suggestion
           if (count == -1) {
             if (upstream.isBounded() == PCollection.IsBounded.UNBOUNDED) {
               throw new UnsupportedOperationException(
                   "ORDER BY without LIMIT is not supported for UNBOUNDED 
PCollections.");
             }
             BeamSqlRowComparator comparator = 
                 new BeamSqlRowComparator(fieldIndices, orientation, 
nullsFirst);
             return upstream
                 .apply("WithDummyKey", WithKeys.of("DummyKey"))
                 .apply("GroupByKey", GroupByKey.create())
                 .apply("SortInMemory", ParDo.of(new 
SortInMemoryFn(comparator)))
                 .setRowSchema(CalciteUtils.toSchema(getRowType()));
           }
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java:
##########
@@ -303,6 +316,31 @@ public void processElement(ProcessContext ctx) {
     }
   }
 
+  /**
+   * A {@link DoFn} that sorts all elements in-memory. Expects input grouped 
by a dummy key, sorts
+   * the iterable values, and outputs them.
+   */
+  private static class SortInMemoryFn extends DoFn<KV<String, Iterable<Row>>, 
Row> {
+    private final BeamSqlRowComparator comparator;
+
+    public SortInMemoryFn(BeamSqlRowComparator comparator) {
+      this.comparator = comparator;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      Iterable<Row> input = ctx.element().getValue();
+      List<Row> list = new ArrayList<>();
+      for (Row r : input) {
+        list.add(r);
+      }
+      list.sort(comparator);
+      for (Row r : list) {
+        ctx.output(r);
+      }
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `ProcessContext` is a legacy style of writing `DoFn`s in Apache Beam. 
It is recommended to use modern parameter injection with `@Element` and 
`OutputReceiver` for better performance and readability.
   
   ```java
       @ProcessElement
       public void processElement(
           @Element KV<String, Iterable<Row>> element,
           OutputReceiver<Row> receiver) {
         Iterable<Row> input = element.getValue();
         if (input == null) {
           return;
         }
         List<Row> list = new ArrayList<>();
         for (Row r : input) {
           list.add(r);
         }
         list.sort(comparator);
         for (Row r : list) {
           receiver.output(r);
         }
       }
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java:
##########
@@ -70,17 +70,28 @@ public static Function create(Method method) {
   }
 
   /*
-   * Finds a method in a given class by name.
+   * Finds a method in a given class by name. In case of overloaded methods 
with the same name,
+   * this prioritizes the overload with the maximum number of parameters. This 
ensures Calcite
+   * can resolve optional/default trailing parameters correctly when binding 
UDF overloads.
+   *
    * @param clazz class to search method in
    * @param name name of the method to find
-   * @return the first method with matching name or null when no method found
+   * @return the matching method with the highest parameter count or null when 
no method found
    */
   static @Nullable Method findMethod(Class<?> clazz, String name) {
+    Method bestMethod = null;
     for (Method method : clazz.getMethods()) {
       if (method.getName().equals(name) && !method.isBridge()) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent picking up compiler-generated synthetic methods (such as those 
created for nested classes or covariant return types), it is highly recommended 
to exclude synthetic methods by checking `!method.isSynthetic()`.
   
   ```suggestion
         if (method.getName().equals(name) && !method.isBridge() && 
!method.isSynthetic()) {
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java:
##########
@@ -170,6 +170,25 @@ public static boolean isStringType(FieldType fieldType) {
           FieldType.DATETIME, SqlTypeName.TIMESTAMP,
           FieldType.STRING, SqlTypeName.VARCHAR);
 
+  private static final Map<Class<?>, SqlTypeName> JAVA_TO_SQL_TYPE_MAPPING =
+      ImmutableMap.<Class<?>, SqlTypeName>builder()
+          .put(String.class, SqlTypeName.VARCHAR)
+          .put(Integer.class, SqlTypeName.INTEGER)
+          .put(int.class, SqlTypeName.INTEGER)
+          .put(Long.class, SqlTypeName.BIGINT)
+          .put(long.class, SqlTypeName.BIGINT)
+          .put(Double.class, SqlTypeName.DOUBLE)
+          .put(double.class, SqlTypeName.DOUBLE)
+          .put(Float.class, SqlTypeName.FLOAT)
+          .put(float.class, SqlTypeName.FLOAT)
+          .put(Short.class, SqlTypeName.SMALLINT)
+          .put(short.class, SqlTypeName.SMALLINT)
+          .put(Byte.class, SqlTypeName.TINYINT)
+          .put(byte.class, SqlTypeName.TINYINT)
+          .put(Boolean.class, SqlTypeName.BOOLEAN)
+          .put(boolean.class, SqlTypeName.BOOLEAN)
+          .build();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Standard SQL queries often use `BigDecimal` for `DECIMAL` / `NUMERIC` types. 
Adding `BigDecimal.class` to the mapping ensures that UDFs returning or 
accepting `BigDecimal` are correctly mapped to Calcite's `SqlTypeName.DECIMAL` 
instead of falling back to a generic Java type.
   
   ```suggestion
     private static final Map<Class<?>, SqlTypeName> JAVA_TO_SQL_TYPE_MAPPING =
         ImmutableMap.<Class<?>, SqlTypeName>builder()
             .put(String.class, SqlTypeName.VARCHAR)
             .put(Integer.class, SqlTypeName.INTEGER)
             .put(int.class, SqlTypeName.INTEGER)
             .put(Long.class, SqlTypeName.BIGINT)
             .put(long.class, SqlTypeName.BIGINT)
             .put(Double.class, SqlTypeName.DOUBLE)
             .put(double.class, SqlTypeName.DOUBLE)
             .put(Float.class, SqlTypeName.FLOAT)
             .put(float.class, SqlTypeName.FLOAT)
             .put(Short.class, SqlTypeName.SMALLINT)
             .put(short.class, SqlTypeName.SMALLINT)
             .put(Byte.class, SqlTypeName.TINYINT)
             .put(byte.class, SqlTypeName.TINYINT)
             .put(Boolean.class, SqlTypeName.BOOLEAN)
             .put(boolean.class, SqlTypeName.BOOLEAN)
             .put(java.math.BigDecimal.class, SqlTypeName.DECIMAL)
             .build();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to