Copilot commented on code in PR #3271:
URL: https://github.com/apache/fluss/pull/3271#discussion_r3207530607
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java:
##########
@@ -536,11 +589,11 @@ private String prepareLogTable() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
- // prepare table data
+ // prepare table data with NULL values in address column
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.newAppend().createWriter();
for (int i = 1; i <= 5; i++) {
- Object[] values = new Object[] {i, "address" + i, "name" + i};
+ Object[] values = new Object[] {i, i % 2 == 0 ? null :
"address" + i, "name" + i};
appendWriter.append(row(values));
// make sure every bucket has records
appendWriter.flush();
Review Comment:
`prepareLogTable()` now writes NULL values into the `address` column, but
`testLimitLogTableScan` (which also uses `prepareLogTable()`) still builds its
expected row set assuming `address2`/`address4` are non-null. This will make
the `isSubsetOf(expected)` assertion flaky or fail depending on which rows are
returned by the LIMIT scan. Consider keeping `prepareLogTable()`’s data
non-null and introducing a dedicated helper for NULL-address data (or update
the other test expectations accordingly).
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java:
##########
@@ -452,6 +479,32 @@ void testCountPushDownForLogTable(boolean partitionTable)
throws Exception {
List<String> expected =
Collections.singletonList(String.format("+I[%s]", expectedRows));
assertThat(collected).isEqualTo(expected);
+ // test COUNT(column) pushdown
+ query = String.format("SELECT COUNT(id) FROM %s", tableName);
+ assertThat(tEnv.explainSql(query))
+ .contains(
+ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
+ iterRows = tEnv.executeSql(query).collect();
+ collected = collectRowsWithTimeout(iterRows, 1);
+ assertThat(collected).isEqualTo(expected);
+
+ // test COUNT(column) with NULL values - should NOT push down for
nullable columns
+ // This will fail because log table doesn't support full scan in batch
mode
+ assertThatThrownBy(
+ () ->
+ tEnv.explainSql(
+ String.format("SELECT COUNT(address)
FROM %s", tableName)))
+ .hasMessageContaining(
+ "Currently, Fluss only support queries on table with
datalake enabled or point queries on primary key when it's in batch execution
mode.");
+ assertThatThrownBy(
+ () ->
+ tEnv.explainSql(
+ String.format(
+ "SELECT COUNT(DISTINCT
address) FROM %s",
+ tableName)))
+ .hasMessageContaining(
+ "Currently, Fluss only support queries on table with
datalake enabled or point queries on primary key when it's in batch execution
mode.");
Review Comment:
The new “should NOT push down” checks for `COUNT(address)`/`COUNT(DISTINCT
address)` only assert that planning fails in batch mode (because non-pushdown
triggers an unsupported full scan). This validates rejection indirectly, but it
doesn’t validate the actual correctness requirement from #3270 (i.e., that
`COUNT(nullable_col)` returns the non-null count when scanning is supported).
Consider adding a variant where scanning is allowed (e.g., a lake-enabled table
or a streaming execution mode) and asserting the real results (e.g.,
`COUNT(address)` = 3 for the non-partitioned log table with two NULLs).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -799,19 +798,33 @@ public boolean applyAggregates(
return false;
}
- FunctionDefinition functionDefinition =
aggregateExpressions.get(0).getFunctionDefinition();
- if (!(functionDefinition
- .getClass()
- .getCanonicalName()
- .equals(
-
"org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction")
- || functionDefinition
- .getClass()
- .getCanonicalName()
- .equals(
-
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction"))) {
+ AggregateExpression aggExpr = aggregateExpressions.get(0);
+ String functionName =
aggExpr.getFunctionDefinition().getClass().getCanonicalName();
+
+ // Verify that the aggregate function is COUNT(*) or COUNT(1)
+ // CountAggFunction: COUNT(*) or COUNT(column)
+ // Count1AggFunction: COUNT(1) with constant argument
+ boolean isCountAgg =
+
"org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction"
+ .equals(functionName);
+ boolean isCount1Agg =
+
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction"
+ .equals(functionName);
+ if (!isCountAgg && !isCount1Agg) {
return false;
}
+
+ // For COUNT(column), reject if column is nullable (cannot handle NULL
filtering)
+ if (isCountAgg) {
+ List<org.apache.flink.table.expressions.Expression> args =
aggExpr.getChildren();
+ if (!args.isEmpty() && args.get(0) instanceof ResolvedExpression) {
+ ResolvedExpression arg = (ResolvedExpression) args.get(0);
+ if (arg.getOutputDataType().getLogicalType().isNullable()) {
+ return false;
+ }
+ }
+ }
Review Comment:
In `applyAggregates`, when `CountAggFunction` has an argument but that
argument is not a `ResolvedExpression`, the nullability check is skipped and
the code still enables row-count pushdown. That can produce wrong results for
`COUNT(expr)` if the expression can be NULL (or if its nullability cannot be
determined). Consider making this conservative: if there is a COUNT argument
and you can’t reliably determine it is non-nullable, return `false` (don’t push
down).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]