Copilot commented on code in PR #3322:
URL: https://github.com/apache/fluss/pull/3322#discussion_r3248529733


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -558,9 +557,17 @@ private List<PartitionInfo> 
applyPartitionFilter(List<PartitionInfo> partitionIn
             return partitionInfos;
         } else {
             int originalSize = partitionInfos.size();
+            RowType partitionRowType = 
PartitionUtils.partitionRowType(tableInfo);

Review Comment:
   `partitionRowType(tableInfo)` is recomputed on every invocation of 
`applyPartitionFilter`, which is called on each periodic partition discovery 
cycle. Since `tableInfo` is effectively immutable for the lifetime of the 
enumerator, consider computing the partition row type once (e.g., in the 
constructor or lazily cached) instead of on every list-partitions tick. Minor, 
but avoids repeated `indexOf` scans and `RowType.project` allocations on the 
hot discovery path.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala:
##########
@@ -53,54 +50,17 @@ object SparkPartitionPredicate {
   }
 
   def filterPartitions(
+      tableInfo: TableInfo,
       partitionInfos: Seq[PartitionInfo],
       partitionPredicate: Option[FlussPredicate]): Seq[PartitionInfo] =
     partitionPredicate match {
       case None => partitionInfos
-      case Some(predicate) => partitionInfos.filter(p => 
predicate.test(toPartitionRow(p)))
+      case Some(predicate) =>
+        val rowType = PartitionUtils.partitionRowType(tableInfo)
+        partitionInfos.filter {
+          p =>
+            predicate.test(
+              
PartitionUtils.toPartitionRow(p.getResolvedPartitionSpec.getPartitionValues, 
rowType))

Review Comment:
   Likewise, `PartitionUtils.partitionRowType(tableInfo)` is recomputed on 
every call to `filterPartitions` even though `tableInfo` does not change 
between calls in a given scan. Consider computing it once at the call site (in 
`FlussAppendBatch`/`FlussUpsertBatch`) or caching it, rather than recomputing 
per filter invocation.



##########
fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java:
##########
@@ -188,6 +188,7 @@ public static Object get(InternalRow internalRow, int pos, 
DataType fieldType) {
                 return internalRow.getDecimal(
                         pos, decimalType.getPrecision(), 
decimalType.getScale());
             case BINARY:
+            case BYTES:

Review Comment:
   The fix to use `getTimestampLtz` for `TIMESTAMP_WITH_LOCAL_TIME_ZONE` (and 
the new BYTES case) is a real correctness fix that becomes reachable now that 
partition rows hold typed values rather than stringified BinaryStrings. 
However, there is no new test exercising partition pruning over a TIMESTAMP_LTZ 
or BYTES/BINARY partition column to lock in this behavior. Consider adding 
coverage for at least the timestamp-ltz path so future regressions in 
`LeafPredicate.get` are caught.



##########
fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java:
##########
@@ -345,4 +348,28 @@ public static String convertValueOfType(Object value, 
DataTypeRoot type) {
         }
         return stringPartitionKey;
     }
+
+    /** Projects {@code tableInfo}'s row type down to its partition key 
columns, in key order. */
+    public static RowType partitionRowType(TableInfo tableInfo) {
+        RowType schema = tableInfo.getRowType();
+        List<String> fieldNames = schema.getFieldNames();
+        int[] indexes =
+                
tableInfo.getPartitionKeys().stream().mapToInt(fieldNames::indexOf).toArray();
+        return schema.project(indexes);
+    }
+
+    /**
+     * Builds a row of typed partition values by parsing each string with 
{@link
+     * #parseValueOfType(String, DataTypeRoot)} for the column at that ordinal 
in {@code
+     * partitionRowType}.
+     */
+    public static GenericRow toPartitionRow(
+            List<String> partitionValues, RowType partitionRowType) {
+        GenericRow row = new GenericRow(partitionValues.size());
+        for (int i = 0; i < partitionValues.size(); i++) {
+            DataTypeRoot type = partitionRowType.getTypeAt(i).getTypeRoot();
+            row.setField(i, parseValueOfType(partitionValues.get(i), type));
+        }
+        return row;
+    }

Review Comment:
   `PartitionUtils.toPartitionRow` calls `parseValueOfType` for each partition 
value, which throws `IllegalArgumentException` for unsupported `DataTypeRoot`s 
(e.g. DECIMAL) or malformed values. This now happens during partition discovery 
/ pushdown filtering on the user's query path. If a partition value cannot be 
parsed (e.g. a stale partition with a value that does not round-trip), the 
entire `applyPartitionFilter` (Flink) or `filterPartitions` (Spark) call will 
throw and fail the query, where previously stringified comparison would either 
succeed or simply mismatch. Consider catching parse failures per partition and 
treating them as "does not match" (or at least logging context including the 
partition spec) to make the failure mode less abrupt and easier to diagnose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to