gemini-code-assist[bot] commented on code in PR #38832:
URL: https://github.com/apache/beam/pull/38832#discussion_r3364173404
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java:
##########
@@ -209,6 +207,20 @@ public PCollection<Row> expand(PCollectionList<Row>
pinput) {
GlobalWindows.class.getSimpleName(), windowingStrategy));
}
+ // When no limit is specified (count == -1), we must sort the entire
dataset.
+ // To achieve this globally, we key all rows by a single dummy key,
group them together
+ // using GroupByKey to ensure they are processed together, and then
sort them in-memory
+ // via SortInMemoryFn. Note: This can be memory-intensive for large
datasets.
+ if (count == -1) {
+ BeamSqlRowComparator comparator =
+ new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
+ return upstream
+ .apply("WithDummyKey", WithKeys.of("DummyKey"))
+ .apply("GroupByKey", GroupByKey.create())
+ .apply("SortInMemory", ParDo.of(new SortInMemoryFn(comparator)))
+ .setRowSchema(CalciteUtils.toSchema(getRowType()));
+ }
Review Comment:

Grouping the entire dataset under a single dummy key (`"DummyKey"`) to
perform a global sort is a severe scalability bottleneck. It routes all data to
a single worker/thread, destroying parallelism, and will easily cause
`OutOfMemoryError` (OOM) on large datasets since all rows are loaded into an
in-memory `ArrayList` in `SortInMemoryFn`.
While the comment notes that this is memory-intensive, we should strongly
emphasize this limitation in the documentation or consider throwing an
exception or logging a warning if the input size is known to be large (though
input size is often not known statically). At a minimum, please ensure the
documentation clearly states that `ORDER BY` without `LIMIT` is only suitable
for small datasets that can comfortably fit in a single worker's memory.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java:
##########
@@ -70,17 +70,30 @@ public static Function create(Method method) {
}
/*
- * Finds a method in a given class by name.
+ * Finds a method in a given class by name. In case of overloaded methods
with the same name,
+ * this prioritizes the overload with the maximum number of parameters. This
ensures Calcite
+ * can resolve optional/default trailing parameters correctly when binding
UDF overloads.
+ *
* @param clazz class to search method in
* @param name name of the method to find
- * @return the first method with matching name or null when no method found
+ * @return the matching method with the highest parameter count or null when
no method found
*/
static @Nullable Method findMethod(Class<?> clazz, String name) {
+ Method bestMethod = null;
for (Method method : clazz.getMethods()) {
if (method.getName().equals(name) && !method.isBridge()) {
- return method;
+ if (bestMethod == null) {
+ bestMethod = method;
+ } else {
+ int cmp =
+ Integer.compare(
+ method.getParameterTypes().length,
bestMethod.getParameterTypes().length);
+ if (cmp > 0 || (cmp == 0 &&
method.toString().compareTo(bestMethod.toString()) < 0)) {
+ bestMethod = method;
+ }
+ }
}
}
- return null;
+ return bestMethod;
}
Review Comment:

Prioritizing the overload with the maximum number of parameters works for
resolving optional/default trailing parameters, but it will completely shadow
and make unreachable any other overloads with fewer parameters that have
different parameter types (e.g., `eval(String)` vs `eval(Integer, Integer)`).
If a UDF class defines multiple overloads with different types, only the one
with the highest parameter count will be registered. Consider if `UdfImpl`
should support registering multiple methods, or at least document this
limitation clearly.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java:
##########
@@ -396,6 +398,47 @@ public static RelDataType
sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Ty
+ ". This is currently unsupported, use List instead "
+ "of Array.");
}
+ if (type instanceof Class) {
+ Class<?> clazz = (Class<?>) type;
+ SqlTypeName sqlTypeName = null;
+ switch (clazz.getName()) {
+ case "java.lang.String":
+ sqlTypeName = SqlTypeName.VARCHAR;
+ break;
+ case "java.lang.Integer":
+ case "int":
+ sqlTypeName = SqlTypeName.INTEGER;
+ break;
+ case "java.lang.Long":
+ case "long":
+ sqlTypeName = SqlTypeName.BIGINT;
+ break;
+ case "java.lang.Double":
+ case "double":
+ sqlTypeName = SqlTypeName.DOUBLE;
+ break;
+ case "java.lang.Float":
+ case "float":
+ sqlTypeName = SqlTypeName.FLOAT;
+ break;
+ case "java.lang.Short":
+ case "short":
+ sqlTypeName = SqlTypeName.SMALLINT;
+ break;
+ case "java.lang.Byte":
+ case "byte":
+ sqlTypeName = SqlTypeName.TINYINT;
+ break;
+ case "java.lang.Boolean":
+ case "boolean":
+ sqlTypeName = SqlTypeName.BOOLEAN;
+ break;
+ }
+ if (sqlTypeName != null) {
+ return typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(sqlTypeName), !clazz.isPrimitive());
+ }
+ }
Review Comment:

Using `clazz.getName()` with a string-based `switch` statement is less
efficient and less idiomatic in Java than comparing `Class` references
directly. A cleaner and more performant approach is to use direct class
comparison (e.g., `clazz == String.class`), which avoids string allocations and
is fully type-safe.
```java
if (type instanceof Class) {
Class<?> clazz = (Class<?>) type;
SqlTypeName sqlTypeName = null;
if (clazz == String.class) {
sqlTypeName = SqlTypeName.VARCHAR;
} else if (clazz == Integer.class || clazz == int.class) {
sqlTypeName = SqlTypeName.INTEGER;
} else if (clazz == Long.class || clazz == long.class) {
sqlTypeName = SqlTypeName.BIGINT;
} else if (clazz == Double.class || clazz == double.class) {
sqlTypeName = SqlTypeName.DOUBLE;
} else if (clazz == Float.class || clazz == float.class) {
sqlTypeName = SqlTypeName.FLOAT;
} else if (clazz == Short.class || clazz == short.class) {
sqlTypeName = SqlTypeName.SMALLINT;
} else if (clazz == Byte.class || clazz == byte.class) {
sqlTypeName = SqlTypeName.TINYINT;
} else if (clazz == Boolean.class || clazz == boolean.class) {
sqlTypeName = SqlTypeName.BOOLEAN;
}
if (sqlTypeName != null) {
return typeFactory.createTypeWithNullability(
typeFactory.createSqlType(sqlTypeName), !clazz.isPrimitive());
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]