Copilot commented on code in PR #1549:
URL: https://github.com/apache/fluss/pull/1549#discussion_r2281132677
##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -100,7 +109,73 @@ public List<SourceSplitBase> generateHybridLakeSplits()
throws Exception {
.createPlanner(
(LakeSource.PlannerContext)
lakeSnapshotInfo::getSnapshotId)
.plan());
+
+ if (lakeSplits.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // first, filter lake splits by partition filters
+ if (!partitionFilters.isEmpty()) {
+ List<String> partitionKeys = fileStoreTable.partitionKeys();
+ boolean isSinglePartitionKey = partitionKeys.size() == 1;
+
+ if (isSinglePartitionKey) {
+ // in single key, name == value
+ Set<String> allowedPartitionNames =
+ partitionFilters.stream()
+ .map(fieldEqual ->
String.valueOf(fieldEqual.equalValue))
+ .collect(Collectors.toSet());
+ lakeSplits
+ .entrySet()
+ .removeIf(entry ->
!allowedPartitionNames.contains(entry.getKey()));
+
+ } else {
+ // multi partition key
+ List<DataField> dataFields =
tableInfo.getRowType().getFields();
+ Map<Integer, String> partitionKeyIdxToValueMap = new
java.util.HashMap<>();
+
+ // build partition key idx to expected value map
+ for (FieldEqual fieldEqual : partitionFilters) {
+ String fieldName =
dataFields.get(fieldEqual.fieldIndex).getName();
+ int partitionKeyIdx = partitionKeys.indexOf(fieldName);
+ if (partitionKeyIdx >= 0) {
+ partitionKeyIdxToValueMap.put(
+ partitionKeyIdx,
String.valueOf(fieldEqual.equalValue));
+ }
+ }
+
+ if (!partitionKeyIdxToValueMap.isEmpty()) {
+ lakeSplits
+ .entrySet()
+ .removeIf(
+ entry -> {
+ String partitionName = entry.getKey();
// e.g. "20250815$08"
+ String[] partitionValues =
+ partitionName.split(
+ Pattern.quote(
+
PARTITION_SPEC_SEPARATOR)); // ["20250815", "08"]
+
+ // check if all partition key idx
match to expected values
+ for (Map.Entry<Integer, String>
mapEntry :
+
partitionKeyIdxToValueMap.entrySet()) {
+ int partitionKeyIdx =
mapEntry.getKey();
+ String expectedValue =
mapEntry.getValue();
+
+ // idx out of bounds or value not
match
+ if (partitionKeyIdx >=
partitionValues.length
+ || !expectedValue.equals(
+
partitionValues[partitionKeyIdx])) {
Review Comment:
This bounds check could incorrectly filter out valid partitions. If
`partitionKeyIdx` is beyond the partition values array, it suggests a mismatch
between the expected partition structure and actual data, which should be
handled as an error rather than silently filtering out the partition.
```suggestion
if (partitionKeyIdx >=
partitionValues.length) {
throw new
IllegalStateException(
String.format(
"Partition key index
%d is out of bounds for partition '%s'. Expected partition keys: %s, actual
partition values: %s",
partitionKeyIdx,
partitionName,
partitionKeys,
java.util.Arrays.toString(partitionValues)
)
);
}
if
(!expectedValue.equals(partitionValues[partitionKeyIdx])) {
```
##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -540,6 +592,18 @@ private int[] getKeyRowProjection() {
return projection;
}
+ @Nullable
+ private Object toPaimonLiteralForPartition(
+ com.alibaba.fluss.types.DataType flussDataType, Object equalValue)
{
+ String typeSummary = flussDataType.toString().toUpperCase();
+ if (typeSummary.contains("CHAR") || typeSummary.contains("STRING")) {
Review Comment:
The string-based type checking using `contains()` is fragile and could match
unintended types. Consider using proper type checking with `flussDataType
instanceof CharType` or `flussDataType instanceof VarCharType` for more robust
type detection.
```suggestion
// Use instanceof for robust type checking
if (flussDataType instanceof CharType
|| flussDataType instanceof VarCharType
|| flussDataType instanceof StringType) {
```
##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -100,7 +109,73 @@ public List<SourceSplitBase> generateHybridLakeSplits()
throws Exception {
.createPlanner(
(LakeSource.PlannerContext)
lakeSnapshotInfo::getSnapshotId)
.plan());
+
+ if (lakeSplits.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // first, filter lake splits by partition filters
+ if (!partitionFilters.isEmpty()) {
+ List<String> partitionKeys = fileStoreTable.partitionKeys();
+ boolean isSinglePartitionKey = partitionKeys.size() == 1;
+
+ if (isSinglePartitionKey) {
+ // in single key, name == value
+ Set<String> allowedPartitionNames =
+ partitionFilters.stream()
+ .map(fieldEqual ->
String.valueOf(fieldEqual.equalValue))
+ .collect(Collectors.toSet());
+ lakeSplits
+ .entrySet()
+ .removeIf(entry ->
!allowedPartitionNames.contains(entry.getKey()));
+
+ } else {
+ // multi partition key
+ List<DataField> dataFields =
tableInfo.getRowType().getFields();
+ Map<Integer, String> partitionKeyIdxToValueMap = new
java.util.HashMap<>();
+
+ // build partition key idx to expected value map
+ for (FieldEqual fieldEqual : partitionFilters) {
+ String fieldName =
dataFields.get(fieldEqual.fieldIndex).getName();
+ int partitionKeyIdx = partitionKeys.indexOf(fieldName);
+ if (partitionKeyIdx >= 0) {
+ partitionKeyIdxToValueMap.put(
+ partitionKeyIdx,
String.valueOf(fieldEqual.equalValue));
+ }
+ }
+
+ if (!partitionKeyIdxToValueMap.isEmpty()) {
+ lakeSplits
+ .entrySet()
+ .removeIf(
+ entry -> {
+ String partitionName = entry.getKey();
// e.g. "20250815$08"
+ String[] partitionValues =
+ partitionName.split(
+ Pattern.quote(
+
PARTITION_SPEC_SEPARATOR)); // ["20250815", "08"]
Review Comment:
The `Pattern.quote()` call is executed for every partition entry during
filtering. Consider pre-compiling the pattern outside the removeIf lambda to
avoid repeated regex compilation overhead.
```suggestion
// Precompile the partition separator pattern to avoid
repeated regex compilation
Pattern partitionSeparatorPattern =
Pattern.compile(Pattern.quote(PARTITION_SPEC_SEPARATOR));
lakeSplits
.entrySet()
.removeIf(
entry -> {
String partitionName =
entry.getKey(); // e.g. "20250815$08"
String[] partitionValues =
partitionSeparatorPattern.split(partitionName); // ["20250815", "08"]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]