Copilot commented on code in PR #3271:
URL: https://github.com/apache/fluss/pull/3271#discussion_r3207530607


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java:
##########
@@ -536,11 +589,11 @@ private String prepareLogTable() throws Exception {
 
         TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
 
-        // prepare table data
+        // prepare table data with NULL values in address column
         try (Table table = conn.getTable(tablePath)) {
             AppendWriter appendWriter = table.newAppend().createWriter();
             for (int i = 1; i <= 5; i++) {
-                Object[] values = new Object[] {i, "address" + i, "name" + i};
+                Object[] values = new Object[] {i, i % 2 == 0 ? null : 
"address" + i, "name" + i};
                 appendWriter.append(row(values));
                 // make sure every bucket has records
                 appendWriter.flush();

Review Comment:
   `prepareLogTable()` now writes NULL values into the `address` column, but 
`testLimitLogTableScan` (which also uses `prepareLogTable()`) still builds its 
expected row set assuming `address2`/`address4` are non-null. This will make 
the `isSubsetOf(expected)` assertion flaky or fail depending on which rows are 
returned by the LIMIT scan. Consider keeping `prepareLogTable()`’s data 
non-null and introducing a dedicated helper for NULL-address data (or update 
the other test expectations accordingly).



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java:
##########
@@ -452,6 +479,32 @@ void testCountPushDownForLogTable(boolean partitionTable) 
throws Exception {
         List<String> expected = 
Collections.singletonList(String.format("+I[%s]", expectedRows));
         assertThat(collected).isEqualTo(expected);
 
+        // test COUNT(column) pushdown
+        query = String.format("SELECT COUNT(id) FROM %s", tableName);
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
+        iterRows = tEnv.executeSql(query).collect();
+        collected = collectRowsWithTimeout(iterRows, 1);
+        assertThat(collected).isEqualTo(expected);
+
+        // test COUNT(column) with NULL values - should NOT push down for 
nullable columns
+        // This will fail because log table doesn't support full scan in batch 
mode
+        assertThatThrownBy(
+                        () ->
+                                tEnv.explainSql(
+                                        String.format("SELECT COUNT(address) 
FROM %s", tableName)))
+                .hasMessageContaining(
+                        "Currently, Fluss only support queries on table with 
datalake enabled or point queries on primary key when it's in batch execution 
mode.");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.explainSql(
+                                        String.format(
+                                                "SELECT COUNT(DISTINCT 
address) FROM %s",
+                                                tableName)))
+                .hasMessageContaining(
+                        "Currently, Fluss only support queries on table with 
datalake enabled or point queries on primary key when it's in batch execution 
mode.");

Review Comment:
   The new “should NOT push down” checks for `COUNT(address)`/`COUNT(DISTINCT 
address)` only assert that planning fails in batch mode (because non-pushdown 
triggers an unsupported full scan). This validates rejection indirectly, but it 
doesn’t validate the actual correctness requirement from #3270 (i.e., that 
`COUNT(nullable_col)` returns the non-null count when scanning is supported). 
Consider adding a variant where scanning is allowed (e.g., a lake-enabled table 
or a streaming execution mode) and asserting the real results (e.g., 
`COUNT(address)` = 3 for the non-partitioned log table with two NULLs).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -799,19 +798,33 @@ public boolean applyAggregates(
             return false;
         }
 
-        FunctionDefinition functionDefinition = 
aggregateExpressions.get(0).getFunctionDefinition();
-        if (!(functionDefinition
-                        .getClass()
-                        .getCanonicalName()
-                        .equals(
-                                
"org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction")
-                || functionDefinition
-                        .getClass()
-                        .getCanonicalName()
-                        .equals(
-                                
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction"))) {
+        AggregateExpression aggExpr = aggregateExpressions.get(0);
+        String functionName = 
aggExpr.getFunctionDefinition().getClass().getCanonicalName();
+
+        // Verify that the aggregate function is COUNT(*) or COUNT(1)
+        // CountAggFunction: COUNT(*) or COUNT(column)
+        // Count1AggFunction: COUNT(1) with constant argument
+        boolean isCountAgg =
+                
"org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction"
+                        .equals(functionName);
+        boolean isCount1Agg =
+                
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction"
+                        .equals(functionName);
+        if (!isCountAgg && !isCount1Agg) {
             return false;
         }
+
+        // For COUNT(column), reject if column is nullable (cannot handle NULL 
filtering)
+        if (isCountAgg) {
+            List<org.apache.flink.table.expressions.Expression> args = 
aggExpr.getChildren();
+            if (!args.isEmpty() && args.get(0) instanceof ResolvedExpression) {
+                ResolvedExpression arg = (ResolvedExpression) args.get(0);
+                if (arg.getOutputDataType().getLogicalType().isNullable()) {
+                    return false;
+                }
+            }
+        }

Review Comment:
   In `applyAggregates`, when `CountAggFunction` has an argument but that 
argument is not a `ResolvedExpression`, the nullability check is skipped and 
the code still enables row-count pushdown. That can produce wrong results for 
`COUNT(expr)` if the expression can be NULL (or if its nullability cannot be 
determined). Consider making this conservative: if there is a COUNT argument 
and you can’t reliably determine it is non-nullable, return `false` (don’t push 
down).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to