Copilot commented on code in PR #3322:
URL: https://github.com/apache/fluss/pull/3322#discussion_r3248529733
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -558,9 +557,17 @@ private List<PartitionInfo>
applyPartitionFilter(List<PartitionInfo> partitionIn
return partitionInfos;
} else {
int originalSize = partitionInfos.size();
+ RowType partitionRowType =
PartitionUtils.partitionRowType(tableInfo);
Review Comment:
`partitionRowType(tableInfo)` is recomputed on every invocation of
`applyPartitionFilter`, which is called on each periodic partition discovery
cycle. Since `tableInfo` is effectively immutable for the lifetime of the
enumerator, consider computing the partition row type once (e.g., in the
constructor or lazily cached) instead of on every list-partitions tick. Minor,
but avoids repeated `indexOf` scans and `RowType.project` allocations on the
hot discovery path.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala:
##########
@@ -53,54 +50,17 @@ object SparkPartitionPredicate {
}
def filterPartitions(
+ tableInfo: TableInfo,
partitionInfos: Seq[PartitionInfo],
partitionPredicate: Option[FlussPredicate]): Seq[PartitionInfo] =
partitionPredicate match {
case None => partitionInfos
- case Some(predicate) => partitionInfos.filter(p =>
predicate.test(toPartitionRow(p)))
+ case Some(predicate) =>
+ val rowType = PartitionUtils.partitionRowType(tableInfo)
+ partitionInfos.filter {
+ p =>
+ predicate.test(
+
PartitionUtils.toPartitionRow(p.getResolvedPartitionSpec.getPartitionValues,
rowType))
Review Comment:
Likewise, `PartitionUtils.partitionRowType(tableInfo)` is recomputed on
every call to `filterPartitions` even though `tableInfo` does not change
between calls in a given scan. Consider computing it once at the call site (in
`FlussAppendBatch`/`FlussUpsertBatch`) or caching it, rather than recomputing
per filter invocation.
##########
fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java:
##########
@@ -188,6 +188,7 @@ public static Object get(InternalRow internalRow, int pos,
DataType fieldType) {
return internalRow.getDecimal(
pos, decimalType.getPrecision(),
decimalType.getScale());
case BINARY:
+ case BYTES:
Review Comment:
The fix to use `getTimestampLtz` for `TIMESTAMP_WITH_LOCAL_TIME_ZONE` (and
the new BYTES case) is a real correctness fix that becomes reachable now that
partition rows hold typed values rather than stringified BinaryStrings.
However, there is no new test exercising partition pruning over a TIMESTAMP_LTZ
or BYTES/BINARY partition column to lock in this behavior. Consider adding
coverage for at least the timestamp-ltz path so future regressions in
`LeafPredicate.get` are caught.
##########
fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java:
##########
@@ -345,4 +348,28 @@ public static String convertValueOfType(Object value,
DataTypeRoot type) {
}
return stringPartitionKey;
}
+
+ /** Projects {@code tableInfo}'s row type down to its partition key
columns, in key order. */
+ public static RowType partitionRowType(TableInfo tableInfo) {
+ RowType schema = tableInfo.getRowType();
+ List<String> fieldNames = schema.getFieldNames();
+ int[] indexes =
+
tableInfo.getPartitionKeys().stream().mapToInt(fieldNames::indexOf).toArray();
+ return schema.project(indexes);
+ }
+
+ /**
+ * Builds a row of typed partition values by parsing each string with
{@link
+ * #parseValueOfType(String, DataTypeRoot)} for the column at that ordinal
in {@code
+ * partitionRowType}.
+ */
+ public static GenericRow toPartitionRow(
+ List<String> partitionValues, RowType partitionRowType) {
+ GenericRow row = new GenericRow(partitionValues.size());
+ for (int i = 0; i < partitionValues.size(); i++) {
+ DataTypeRoot type = partitionRowType.getTypeAt(i).getTypeRoot();
+ row.setField(i, parseValueOfType(partitionValues.get(i), type));
+ }
+ return row;
+ }
Review Comment:
`PartitionUtils.toPartitionRow` calls `parseValueOfType` for each partition
value, which throws `IllegalArgumentException` for unsupported `DataTypeRoot`s
(e.g. DECIMAL) or malformed values. This now happens during partition discovery
/ pushdown filtering on the user's query path. If a partition value cannot be
parsed (e.g. a stale partition with a value that does not round-trip), the
entire `applyPartitionFilter` (Flink) or `filterPartitions` (Spark) call will
throw and fail the query, where previously stringified comparison would either
succeed or simply mismatch. Consider catching parse failures per partition and
treating them as "does not match" (or at least logging context including the
partition spec) to make the failure mode less abrupt and easier to diagnose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]