This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0964d866c [flink] Union read supports reading partition data that
exists in the lake but is expired in Fluss (#2197)
0964d866c is described below
commit 0964d866c0330967a8cf2d65d298dcc8b9eda9c2
Author: Liebing <[email protected]>
AuthorDate: Mon Dec 22 10:32:49 2025 +0800
[flink] Union read supports reading partition data that exists in the lake
but is expired in Fluss (#2197)
---
.../fluss/lake/source/TestingLakeSource.java | 142 +++++++++++
.../apache/fluss/lake/source/TestingLakeSplit.java | 37 ++-
.../apache/fluss/lake/source/TestingPlanner.java | 51 ++++
.../source/enumerator/FlinkSourceEnumerator.java | 52 +++-
.../flink/source/event/PartitionsRemovedEvent.java | 15 ++
.../source/reader/FlinkSourceSplitReader.java | 18 +-
.../fluss/flink/lake/LakeSplitSerializerTest.java | 32 +--
.../enumerator/FlinkSourceEnumeratorTest.java | 279 +++++++++++++++++++++
.../flink/source/reader/FlinkSourceReaderTest.java | 10 +-
.../state/SourceEnumeratorStateSerializerTest.java | 41 ++-
.../fluss/lake/paimon/source/PaimonSplit.java | 10 +
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 91 +++++++
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 107 ++++++++
13 files changed, 817 insertions(+), 68 deletions(-)
diff --git
a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
new file mode 100644
index 000000000..d2fa4269b
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.source;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.utils.CloseableIterator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A testing implementation of {@link LakeSource}. */
+public class TestingLakeSource implements LakeSource<LakeSplit> {
+
+ // bucket num of source table
+ private final int bucketNum;
+
+ // partition infos of partitions contain lake splits
+ private final List<PartitionInfo> partitionInfos;
+
+ public TestingLakeSource() {
+ this.bucketNum = 0;
+ this.partitionInfos = null;
+ }
+
+ public TestingLakeSource(int bucketNum, List<PartitionInfo>
partitionInfos) {
+ this.bucketNum = bucketNum;
+ this.partitionInfos = partitionInfos;
+ }
+
+ @Override
+ public void withProject(int[][] project) {}
+
+ @Override
+ public void withLimit(int limit) {}
+
+ @Override
+ public FilterPushDownResult withFilters(List<Predicate> predicates) {
+ return null;
+ }
+
+ @Override
+ public Planner<LakeSplit> createPlanner(PlannerContext context) throws
IOException {
+ return new TestingPlanner(bucketNum, partitionInfos);
+ }
+
+ @Override
+ public RecordReader createRecordReader(ReaderContext<LakeSplit> context)
throws IOException {
+ return CloseableIterator::emptyIterator;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<LakeSplit> getSplitSerializer() {
+ return new SimpleVersionedSerializer<LakeSplit>() {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(LakeSplit split) throws IOException {
+ if (split instanceof TestingLakeSplit) {
+ TestingLakeSplit testingSplit = (TestingLakeSplit) split;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream dos = new DataOutputStream(baos)) {
+ // Serialize bucket
+ dos.writeInt(testingSplit.bucket());
+
+ // Serialize partition list
+ List<String> partition = testingSplit.partition();
+ if (partition == null) {
+ dos.writeInt(-1);
+ } else {
+ dos.writeInt(partition.size());
+ for (String part : partition) {
+ // Write a boolean flag to indicate if the
string is null
+ dos.writeBoolean(part != null);
+ if (part != null) {
+ dos.writeUTF(part);
+ }
+ }
+ }
+ }
+ return baos.toByteArray();
+ }
+ throw new IOException("Unsupported split type: " +
split.getClass().getName());
+ }
+
+ @Override
+ public LakeSplit deserialize(int version, byte[] serialized)
throws IOException {
+ if (version != 0) {
+ throw new IOException("Unsupported version: " + version);
+ }
+
+ try (DataInputStream dis =
+ new DataInputStream(new
ByteArrayInputStream(serialized))) {
+ // Deserialize bucket
+ int bucket = dis.readInt();
+
+ // Deserialize partition list
+ int partitionSize = dis.readInt();
+ List<String> partition;
+ if (partitionSize < 0) {
+ partition = null;
+ } else {
+ partition = new ArrayList<>(partitionSize);
+ for (int i = 0; i < partitionSize; i++) {
+ // Read boolean flag to determine if the string is
null
+ boolean isNotNull = dis.readBoolean();
+ String part = isNotNull ? dis.readUTF() : null;
+ partition.add(part);
+ }
+ }
+
+ return new TestingLakeSplit(bucket, partition);
+ }
+ }
+ };
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
similarity index 52%
copy from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
copy to
fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
index e21c4a155..dae4ee2e7 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSplit.java
@@ -15,34 +15,33 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.source.event;
+package org.apache.fluss.lake.source;
-import org.apache.flink.api.connector.source.SourceEvent;
+import java.util.List;
-import java.util.Map;
+/** A testing implementation of {@link LakeSplit}. */
+public class TestingLakeSplit implements LakeSplit {
-/**
- * A source event to represent partitions is removed to send from enumerator
to reader.
- *
- * <p>It contains the partition bucket of the removed partitions that has been
assigned to the
- * reader.
- */
-public class PartitionsRemovedEvent implements SourceEvent {
+ private final int bucket;
+ private final List<String> partition;
- private static final long serialVersionUID = 1L;
-
- private final Map<Long, String> removedPartitions;
+ public TestingLakeSplit(int bucket, List<String> partition) {
+ this.bucket = bucket;
+ this.partition = partition;
+ }
- public PartitionsRemovedEvent(Map<Long, String> removedPartitions) {
- this.removedPartitions = removedPartitions;
+ @Override
+ public String toString() {
+ return "TestingLakeSplit{" + "bucket=" + bucket + ", partition=" +
partition + '}';
}
- public Map<Long, String> getRemovedPartitions() {
- return removedPartitions;
+ @Override
+ public int bucket() {
+ return bucket;
}
@Override
- public String toString() {
- return "PartitionsRemovedEvent{" + "removedPartitions=" +
removedPartitions + '}';
+ public List<String> partition() {
+ return partition;
}
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java
new file mode 100644
index 000000000..2ae27221e
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingPlanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.source;
+
+import org.apache.fluss.metadata.PartitionInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A testing implementation of {@link Planner}. */
+public class TestingPlanner implements Planner<LakeSplit> {
+
+ private final int bucketNum;
+ private final List<PartitionInfo> partitionInfos;
+
+ public TestingPlanner(int bucketNum, List<PartitionInfo> partitionInfos) {
+ this.bucketNum = bucketNum;
+ this.partitionInfos = partitionInfos;
+ }
+
+ @Override
+ public List<LakeSplit> plan() throws IOException {
+ List<LakeSplit> splits = new ArrayList<>();
+
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ for (int i = 0; i < bucketNum; i++) {
+ splits.add(
+ new TestingLakeSplit(
+ i,
partitionInfo.getResolvedPartitionSpec().getPartitionValues()));
+ }
+ }
+
+ return splits;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 55a9c871c..f8db0aff1 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -24,6 +24,8 @@ import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.lake.LakeSplitGenerator;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import
org.apache.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
import
org.apache.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -83,7 +85,8 @@ import static org.apache.fluss.utils.Preconditions.checkState;
* <p>The enumerator is responsible for:
*
* <ul>
- * <li>Get the all splits(snapshot split + log split) for a table of Fluss
to be read.
+ * <li>Get the all splits(lake split + kv snapshot split + log split) for a
table of Fluss to be
+ * read.
* <li>Assign the splits to readers with the guarantee that the splits
belong to the same bucket
* will be assigned to same reader.
* </ul>
@@ -110,10 +113,15 @@ public class FlinkSourceEnumerator
*
* <p>It's mainly used to help enumerator to broadcast the partition
removed event to the
* readers when partitions is dropped.
+ *
+ * <p>If an assigned partition exists only in the lake and has already
expired in Fluss, it will
+ * remain here indefinitely and will not be removed. However, considering
that only a small
+ * number of such lake-only partitions might exist during the initial
startup, and they consume
+ * minimal memory, this issue is being ignored for now.
*/
private final Map<Long, String> assignedPartitions;
- /** buckets that have been assigned to readers. */
+ /** Buckets that have been assigned to readers. */
private final Set<TableBucket> assignedTableBuckets;
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
@@ -222,12 +230,12 @@ public class FlinkSourceEnumerator
this.context = checkNotNull(context);
this.pendingSplitAssignment = new HashMap<>();
this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
- this.startingOffsetsInitializer = startingOffsetsInitializer;
this.assignedPartitions = new HashMap<>(assignedPartitions);
this.pendingHybridLakeFlussSplits =
pendingHybridLakeFlussSplits == null
? null
: new LinkedList<>(pendingHybridLakeFlussSplits);
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
this.streaming = streaming;
this.partitionFilters = partitionFilters;
@@ -258,6 +266,10 @@ public class FlinkSourceEnumerator
// we'll need to consider lake splits
List<SourceSplitBase> hybridLakeFlussSplits =
generateHybridLakeFlussSplits();
if (hybridLakeFlussSplits != null) {
+ LOG.info(
+ "Generated {} hybrid lake splits for table
{}.",
+ hybridLakeFlussSplits.size(),
+ tablePath);
// handle hybrid lake fluss splits firstly
handleSplitsAdd(hybridLakeFlussSplits, null);
}
@@ -554,7 +566,8 @@ public class FlinkSourceEnumerator
// hybrid snapshot log split;
OptionalLong logOffset = snapshots.getLogOffset(bucketId);
checkState(
- logOffset.isPresent(), "Log offset should be present
if snapshot id is.");
+ logOffset.isPresent(),
+ "Log offset should be present if snapshot id is
present.");
splits.add(
new HybridSnapshotLogSplit(
tb, partitionName, snapshotId.getAsLong(),
logOffset.getAsLong()));
@@ -616,6 +629,7 @@ public class FlinkSourceEnumerator
// should be restored from checkpoint, shouldn't
// list splits again
if (pendingHybridLakeFlussSplits != null) {
+ LOG.info("Still have pending lake fluss splits, shouldn't list
splits again.");
return pendingHybridLakeFlussSplits;
}
try {
@@ -664,9 +678,28 @@ public class FlinkSourceEnumerator
pendingSplitAssignment.forEach(
(reader, splits) ->
splits.removeIf(
- split ->
- removedPartitionsMap.containsKey(
-
split.getTableBucket().getPartitionId())));
+ split -> {
+ // Never remove LakeSnapshotSplit, because
during union reads,
+ // data from the lake must still be read
even if the partition
+ // has already expired in Fluss.
+ if (split instanceof LakeSnapshotSplit) {
+ return false;
+ }
+
+ // Similar to LakeSnapshotSplit, if it
contains any lake split,
+ // never remove it; otherwise, it can be
removed when the Fluss
+ // partition expires.
+ if (split instanceof
LakeSnapshotAndFlussLogSplit) {
+ LakeSnapshotAndFlussLogSplit
hybridSplit =
+ (LakeSnapshotAndFlussLogSplit)
split;
+ if
(!hybridSplit.isLakeSplitFinished()) {
+ return false;
+ }
+ }
+
+ return removedPartitionsMap.containsKey(
+
split.getTableBucket().getPartitionId());
+ }));
// send partition removed event to all readers
PartitionsRemovedEvent event = new
PartitionsRemovedEvent(removedPartitionsMap);
@@ -863,6 +896,11 @@ public class FlinkSourceEnumerator
return assignedPartitions;
}
+ @VisibleForTesting
+ Map<Integer, List<SourceSplitBase>> getPendingSplitAssignment() {
+ return pendingSplitAssignment;
+ }
+
@Override
public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
LOG.debug("Flink Source Enumerator adds splits back: {}", splits);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
index e21c4a155..4c703e60c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/PartitionsRemovedEvent.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.source.event;
import org.apache.flink.api.connector.source.SourceEvent;
import java.util.Map;
+import java.util.Objects;
/**
* A source event to represent partitions is removed to send from enumerator
to reader.
@@ -41,6 +42,20 @@ public class PartitionsRemovedEvent implements SourceEvent {
return removedPartitions;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionsRemovedEvent that = (PartitionsRemovedEvent) o;
+ return Objects.equals(removedPartitions, that.removedPartitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(removedPartitions);
+ }
+
@Override
public String toString() {
return "PartitionsRemovedEvent{" + "removedPartitions=" +
removedPartitions + '}';
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index ce50fe406..9a2ab7aeb 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -102,7 +102,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private final Table table;
private final FlinkMetricRegistry flinkMetricRegistry;
- @Nullable private LakeSource<LakeSplit> lakeSource;
+ @Nullable private final LakeSource<LakeSplit> lakeSource;
// table id, will be null when haven't received any split
private Long tableId;
@@ -131,7 +131,6 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
this.boundedSplits = new ArrayDeque<>();
this.subscribedBuckets = new HashMap<>();
this.projectedFields = projectedFields;
- if (projectedFields == null) {}
this.flinkSourceReaderMetrics = flinkSourceReaderMetrics;
sanityCheck(table.getTableInfo().getRowType(), projectedFields);
@@ -313,9 +312,12 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
}
public Set<TableBucket> removePartitions(Map<Long, String>
removedPartitions) {
- // First, if the current active bounded split belongs to a removed
partition,
- // finish it so it will not be restored.
- if (currentBoundedSplit != null) {
+ // First, if the current active bounded split belongs to a removed
partition and is not
+ // LakeSnapshotSplit, finish it so it will not be restored.
+ // Splits contains lake splits cannot be terminated even if its
corresponding partition has
+ // expired in Fluss; otherwise, union reads will fail to correctly
read partitions that
+ // exist in the lake but have already expired in Fluss.
+ if (currentBoundedSplit != null && !currentBoundedSplit.isLakeSplit())
{
TableBucket currentBucket = currentBoundedSplit.getTableBucket();
if (removedPartitions.containsKey(currentBucket.getPartitionId()))
{
try {
@@ -341,7 +343,11 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
while (snapshotSplitIterator.hasNext()) {
SourceSplitBase split = snapshotSplitIterator.next();
TableBucket tableBucket = split.getTableBucket();
- if (removedPartitions.containsKey(tableBucket.getPartitionId())) {
+ // Splits contains lake splits cannot be terminated even if its
corresponding partition
+ // has expired in Fluss; otherwise, union reads will fail to
correctly read partitions
+ // that exist in the lake but have already expired in Fluss.
+ if (removedPartitions.containsKey(tableBucket.getPartitionId())
+ && !split.isLakeSplit()) {
removedSplits.add(split.splitId());
snapshotSplitIterator.remove();
unsubscribedTableBuckets.add(tableBucket);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index cbad27ef3..ddc3e9f1f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -23,6 +23,7 @@ import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSplit;
import org.apache.fluss.metadata.TableBucket;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -31,7 +32,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import static
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
import static
org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
@@ -47,7 +47,7 @@ class LakeSplitSerializerTest {
private static final int STOPPING_OFFSET = 1024;
private static final LakeSplit LAKE_SPLIT =
- new TestLakeSplit(0, Collections.singletonList("2025-08-18"));
+ new TestingLakeSplit(0, Collections.singletonList("2025-08-18"));
private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
new TestSimpleVersionedSerializer();
@@ -203,7 +203,7 @@ class LakeSplitSerializerTest {
if (version < V2) {
return LAKE_SPLIT;
}
- return new TestLakeSplit(0,
Collections.singletonList("2025-08-19"));
+ return new TestingLakeSplit(0,
Collections.singletonList("2025-08-19"));
}
@Override
@@ -211,30 +211,4 @@ class LakeSplitSerializerTest {
return V2;
}
}
-
- private static class TestLakeSplit implements LakeSplit {
-
- private final int bucket;
- private final List<String> partition;
-
- public TestLakeSplit(int bucket, List<String> partition) {
- this.bucket = bucket;
- this.partition = partition;
- }
-
- @Override
- public String toString() {
- return "TestLakeSplit";
- }
-
- @Override
- public int bucket() {
- return bucket;
- }
-
- @Override
- public List<String> partition() {
- return partition;
- }
- }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 20196bc81..6dfce8e4d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -22,6 +22,8 @@ import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.client.write.HashBucketAssigner;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -30,6 +32,12 @@ import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.source.split.SnapshotSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSource;
+import org.apache.fluss.lake.source.TestingLakeSplit;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
@@ -37,6 +45,9 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
+import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap;
import org.apache.fluss.types.DataTypes;
import org.apache.flink.api.connector.source.ReaderInfo;
@@ -45,9 +56,11 @@ import
org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -57,7 +70,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -561,6 +576,189 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testPartitionsExpiredInFlussButExistInLake(
+ boolean isPrimaryKeyTable, @TempDir Path tempDir) throws Throwable
{
+ int numSubtasks = 3;
+ TableDescriptor tableDescriptor =
+ isPrimaryKeyTable
+ ? DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR
+ : DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR;
+ long tableId = createTable(DEFAULT_TABLE_PATH, tableDescriptor);
+
+ ZooKeeperClient zooKeeperClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ Map<Long, String> partitionNameByIds =
+ waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
+ assertThat(partitionNameByIds.size()).isEqualTo(2);
+ // Assume some data in the first partition has been tiered to the lake
+ Long hybridPartitionId =
partitionNameByIds.keySet().stream().sorted().findFirst().get();
+ String hybridPartitionName = partitionNameByIds.get(hybridPartitionId);
+
+ // Mock expired partitions which already expired in Fluss, but exist
in lake.
+ // Use a dummy partition id since these partitions are expired in fluss
+ Map<Long, String> expiredPartitions = new HashMap<>();
+ expiredPartitions.put(-1L, "expiredPartition1");
+ expiredPartitions.put(-2L, "expiredPartition2");
+
+ // Mock a lake snapshot for expired partitions and the first partition
exists in Fluss
+ long lakeEndOffset = 50L;
+ LakeTableSnapshot lakeTableSnapshot =
+ new LakeTableSnapshot(
+ 0,
+ ImmutableMap.of(
+ new TableBucket(tableId, -1L, 0), 100L,
+ new TableBucket(tableId, -1L, 1), 100L,
+ new TableBucket(tableId, -1L, 2), 100L,
+ new TableBucket(tableId, -2L, 0), 100L,
+ new TableBucket(tableId, -2L, 1), 100L,
+ new TableBucket(tableId, -2L, 2), 100L,
+ new TableBucket(tableId, hybridPartitionId,
0), lakeEndOffset,
+ new TableBucket(tableId, hybridPartitionId,
1), lakeEndOffset,
+ new TableBucket(tableId, hybridPartitionId,
2), lakeEndOffset));
+ LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient,
tempDir.toString());
+ lakeTableHelper.upsertLakeTable(tableId, DEFAULT_TABLE_PATH,
lakeTableSnapshot);
+
+ // Create PartitionInfo for lake partitions
+ List<PartitionInfo> lakePartitionInfos = new ArrayList<>();
+ for (Map.Entry<Long, String> partition : expiredPartitions.entrySet())
{
+ Long partitionId = partition.getKey();
+ String partitionName = partition.getValue();
+ ResolvedPartitionSpec partitionSpec =
+ ResolvedPartitionSpec.fromPartitionName(
+ Collections.singletonList(isPrimaryKeyTable ?
"date" : "name"),
+ partitionName);
+ lakePartitionInfos.add(new PartitionInfo(partitionId,
partitionSpec));
+ }
+ ResolvedPartitionSpec partitionSpec =
+ ResolvedPartitionSpec.fromPartitionName(
+ Collections.singletonList(isPrimaryKeyTable ? "date" :
"name"),
+ hybridPartitionName);
+ lakePartitionInfos.add(new PartitionInfo(hybridPartitionId,
partitionSpec));
+
+ LakeSource<LakeSplit> lakeSource =
+ new TestingLakeSource(DEFAULT_BUCKET_NUM, lakePartitionInfos);
+ try (MockSplitEnumeratorContext<SourceSplitBase> context =
+ new MockSplitEnumeratorContext<>(numSubtasks);
+ MockWorkExecutor workExecutor = new MockWorkExecutor(context);
+ FlinkSourceEnumerator enumerator =
+ new FlinkSourceEnumerator(
+ DEFAULT_TABLE_PATH,
+ flussConf,
+ isPrimaryKeyTable,
+ true,
+ context,
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ null,
+ OffsetsInitializer.full(),
+ DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
+ streaming,
+ null,
+ lakeSource,
+ workExecutor)) {
+ enumerator.start();
+
+ // Remove the hybrid partition to mock expire after enumerator
start
+ dropPartitions(
+ zooKeeperClient,
+ DEFAULT_TABLE_PATH,
+ Collections.singleton(hybridPartitionName));
+
+ // Run periodic partition discovery to trigger
handlePartitionsRemoved once
+ runPeriodicPartitionDiscovery(workExecutor);
+ // Verify that the pending splits belong to expired partitions are
not removed
+ Map<Integer, List<SourceSplitBase>> pendingSplitAssignment =
+ enumerator.getPendingSplitAssignment();
+ assertThat(
+ (int)
+ pendingSplitAssignment.values().stream()
+ .flatMap(List::stream)
+ .filter(
+ split ->
+
expiredPartitions.containsKey(
+
split.getTableBucket()
+
.getPartitionId()))
+ .count())
+ .isEqualTo(expiredPartitions.size() * DEFAULT_BUCKET_NUM);
+ // Verify that the pending LakeSnapshotSplit(for log) and
+ // LakeSnapshotAndFlussLogSplit(for kv) are not removed for the
expired partition
+ List<SourceSplitBase> hybridPendingSplits =
+ pendingSplitAssignment.values().stream()
+ .flatMap(List::stream)
+ .filter(
+ split ->
+ Objects.equals(
+
split.getTableBucket().getPartitionId(),
+ hybridPartitionId))
+ .collect(Collectors.toList());
+ // log table will have 3 LakeSnapshotSplit
+ // kv table will have 3 LakeSnapshotAndFlussLogSplit
+ assertThat(hybridPendingSplits).hasSize(DEFAULT_BUCKET_NUM);
+ // Shouldn't have any PartitionsRemovedEvent, since no readers
registered
+ assertThat(context.getSentSourceEvent()).isEmpty();
+
+ // Register the readers
+ for (int i = 0; i < numSubtasks; i++) {
+ registerReader(context, enumerator, i);
+ }
+
+ // All partitions include expired partitions should be assigned
+ Map<Long, String> expectedAssignedPartitions = new
HashMap<>(partitionNameByIds);
+ lakePartitionInfos.forEach(
+ partitionInfo ->
+ expectedAssignedPartitions.put(
+ partitionInfo.getPartitionId(),
+ partitionInfo.getPartitionName()));
+
assertThat(enumerator.getAssignedPartitions()).isEqualTo(expectedAssignedPartitions);
+
+ // Verify that splits for expired partitions are generated and
assigned
+ Map<Integer, List<SourceSplitBase>> actualAssignments =
getReadersAssignments(context);
+ Map<TableBucket, Integer> lakeSnapshotSplitsSplitIndex =
+ actualAssignments.values().stream()
+ .flatMap(List::stream)
+ .filter(split -> split instanceof
LakeSnapshotSplit)
+ .map(split -> (LakeSnapshotSplit) split)
+ .collect(
+ Collectors.toMap(
+ SourceSplitBase::getTableBucket,
+ LakeSnapshotSplit::getSplitIndex));
+ Map<Integer, List<SourceSplitBase>> expectedAssignments =
+ expectAssignments(
+ enumerator,
+ tableId,
+ isPrimaryKeyTable,
+ partitionNameByIds,
+ lakePartitionInfos,
+ lakeSnapshotSplitsSplitIndex,
+ expiredPartitions);
+ checkAssignmentIgnoreOrder(actualAssignments, expectedAssignments);
+
+ // Run periodic partition discovery to trigger
handlePartitionsRemoved again
+ runPeriodicPartitionDiscovery(workExecutor);
+
+ // Verify that PartitionsRemovedEvent is sent
+ Map<Integer, List<SourceEvent>> expectedSentSourceEvent = new
HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ Map<Long, String> removedPartitions = new HashMap<>();
+ // Add removed fluss partition
+ removedPartitions.put(hybridPartitionId, hybridPartitionName);
+ // Add lake partitions
+ lakePartitionInfos.forEach(
+ partitionInfo ->
+ removedPartitions.put(
+ partitionInfo.getPartitionId(),
+ partitionInfo.getPartitionName()));
+
+ expectedSentSourceEvent.put(
+ i,
+ Collections.singletonList(new
PartitionsRemovedEvent(removedPartitions)));
+ }
+ Map<Integer, List<SourceEvent>> actualSentSourceEvent =
context.getSentSourceEvent();
+
assertThat(actualSentSourceEvent).isEqualTo(expectedSentSourceEvent);
+ }
+ }
+
// ---------------------
private void registerReader(
MockSplitEnumeratorContext<SourceSplitBase> context,
@@ -586,6 +784,87 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
return expectedAssignment;
}
+ private Map<Integer, List<SourceSplitBase>> expectAssignments(
+ FlinkSourceEnumerator enumerator,
+ long tableId,
+ boolean hasPrimaryKey,
+ Map<Long, String> partitionNameIds,
+ List<PartitionInfo> lakePartitionInfos,
+ Map<TableBucket, Integer> lakeSnapshotSplitsSplitIndex,
+ Map<Long, String> expiredPartitions) {
+ Map<Integer, List<SourceSplitBase>> expectedAssignment = new
HashMap<>();
+
+ // for partitions exist in Fluss when startup
+ for (Long partitionId : partitionNameIds.keySet()) {
+ for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+ TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
+ List<SourceSplitBase> splits = new ArrayList<>();
+ if (hasPrimaryKey) {
+ splits.add(
+ new LakeSnapshotAndFlussLogSplit(
+ tableBucket,
+ partitionNameIds.get(partitionId),
+ null,
+ EARLIEST_OFFSET,
+ Long.MIN_VALUE));
+ } else {
+ if (lakePartitionInfos.stream()
+ .map(PartitionInfo::getPartitionId)
+ .anyMatch(partitionId::equals)) {
+ splits.add(
+ new LakeSnapshotSplit(
+ tableBucket,
+ partitionNameIds.get(partitionId),
+ new TestingLakeSplit(
+ i,
+ Collections.singletonList(
+
partitionNameIds.get(partitionId))),
+
lakeSnapshotSplitsSplitIndex.get(tableBucket)));
+ } else {
+ splits.add(
+ new LogSplit(
+ tableBucket,
+ partitionNameIds.get(partitionId),
+ EARLIEST_OFFSET));
+ }
+ }
+ splits.forEach(
+ split -> {
+ int task = enumerator.getSplitOwner(split);
+ expectedAssignment
+ .computeIfAbsent(task, k -> new
ArrayList<>())
+ .add(split);
+ });
+ }
+ }
+
+ // for partitions expired in Fluss but exists in lake
+ for (PartitionInfo lakePartitionInfo : lakePartitionInfos) {
+ Long partitionId = lakePartitionInfo.getPartitionId();
+ if (!expiredPartitions.containsKey(partitionId)) {
+ continue;
+ }
+
+ String lakePartitionName = lakePartitionInfo.getPartitionName();
+ List<String> partitionValues =
+
lakePartitionInfo.getResolvedPartitionSpec().getPartitionValues();
+ for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
+ TableBucket tableBucket =
+ new TableBucket(tableId,
lakePartitionInfo.getPartitionId(), i);
+ SourceSplitBase split =
+ new LakeSnapshotSplit(
+ tableBucket,
+ lakePartitionName,
+ new TestingLakeSplit(i, partitionValues),
+ lakeSnapshotSplitsSplitIndex.get(tableBucket));
+ int task = enumerator.getSplitOwner(split);
+ expectedAssignment.computeIfAbsent(task, k -> new
ArrayList<>()).add(split);
+ }
+ }
+
+ return expectedAssignment;
+ }
+
private void checkAssignmentIgnoreOrder(
Map<Integer, List<SourceSplitBase>> actualAssignment,
Map<Integer, List<SourceSplitBase>> expectedAssignment) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
index 6634236aa..bf9792251 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -26,6 +26,8 @@ import
org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
@@ -85,7 +87,8 @@ class FlinkSourceReaderTest extends FlinkTestBase {
clientConf,
tablePath,
tableDescriptor.getSchema().getRowType(),
- readerContext)) {
+ readerContext,
+ null)) {
// first of all, add all splits of all partitions to the reader
Map<Long, Set<TableBucket>> assignedBuckets = new HashMap<>();
@@ -159,7 +162,8 @@ class FlinkSourceReaderTest extends FlinkTestBase {
Configuration flussConf,
TablePath tablePath,
RowType sourceOutputType,
- SourceReaderContext context)
+ SourceReaderContext context,
+ LakeSource<LakeSplit> lakeSource)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordAndPos>>
elementsQueue =
new FutureCompletingBlockingQueue<>();
@@ -181,6 +185,6 @@ class FlinkSourceReaderTest extends FlinkTestBase {
null,
new FlinkSourceReaderMetrics(context.metricGroup()),
recordEmitter,
- null);
+ lakeSource);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index 6a413f80d..26b4a024c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -17,14 +17,23 @@
package org.apache.fluss.flink.source.state;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
+import org.apache.fluss.flink.source.split.LogSplit;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.lake.source.TestingLakeSource;
+import org.apache.fluss.lake.source.TestingLakeSplit;
import org.apache.fluss.metadata.TableBucket;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,7 +47,7 @@ class SourceEnumeratorStateSerializerTest {
@Test
void testPendingSplitsCheckpointSerde() throws Exception {
FlussSourceEnumeratorStateSerializer serializer =
- new FlussSourceEnumeratorStateSerializer(null);
+ new FlussSourceEnumeratorStateSerializer(new
TestingLakeSource());
Set<TableBucket> assignedBuckets =
new HashSet<>(Arrays.asList(new TableBucket(1, 0), new
TableBucket(1, 4L, 1)));
@@ -46,13 +55,37 @@ class SourceEnumeratorStateSerializerTest {
assignedPartitions.put(1L, "partition1");
assignedPartitions.put(2L, "partition2");
+ // Create remaining hybrid lake fluss splits with different types
+ List<SourceSplitBase> remainingHybridLakeFlussSplits = new
ArrayList<>();
+
+ // Add a LogSplit
+ TableBucket logSplitBucket = new TableBucket(1, 0);
+ LogSplit logSplit = new LogSplit(logSplitBucket, null, 100L);
+ remainingHybridLakeFlussSplits.add(logSplit);
+
+ // Add a HybridSnapshotLogSplit
+ TableBucket hybridSplitBucket = new TableBucket(1, 1);
+ HybridSnapshotLogSplit hybridSplit =
+ new HybridSnapshotLogSplit(hybridSplitBucket, null, 200L, 50L);
+ remainingHybridLakeFlussSplits.add(hybridSplit);
+
+ // Add a LakeSnapshotAndFlussLogSplit
+ TableBucket lakeHybridSplitBucket = new TableBucket(1, 100L, 2);
+ List<LakeSplit> lakeSplits =
+ Collections.singletonList(
+ new TestingLakeSplit(2,
Collections.singletonList("2024-01-01")));
+ LakeSnapshotAndFlussLogSplit lakeHybridSplit =
+ new LakeSnapshotAndFlussLogSplit(
+ lakeHybridSplitBucket, "2024-01-01", lakeSplits, 300L,
Long.MIN_VALUE);
+ remainingHybridLakeFlussSplits.add(lakeHybridSplit);
+
SourceEnumeratorState sourceEnumeratorState =
new SourceEnumeratorState(
- assignedBuckets, assignedPartitions,
Collections.emptyList());
+ assignedBuckets, assignedPartitions,
remainingHybridLakeFlussSplits);
- // serialize assigned buckets
+ // serialize state with remaining hybrid lake fluss splits
byte[] serialized = serializer.serialize(sourceEnumeratorState);
- // deserialize assigned buckets
+ // deserialize state
SourceEnumeratorState deserializedSourceEnumeratorState =
serializer.deserialize(serializer.getVersion(), serialized);
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
index aa9da5986..7096c0abe 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java
@@ -72,4 +72,14 @@ public class PaimonSplit implements LakeSplit {
public boolean isBucketUnAware() {
return isBucketUnAware;
}
+
+ @Override
+ public String toString() {
+ return "PaimonSplit{"
+ + "dataSplit="
+ + dataSplit
+ + ", isBucketUnAware="
+ + isBucketUnAware
+ + '}';
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index cdb88dcbf..6ebffa9c5 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -17,6 +17,8 @@
package org.apache.fluss.lake.paimon.flink;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalRow;
@@ -31,6 +33,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -38,10 +41,12 @@ import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
import java.io.File;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -50,7 +55,9 @@ import java.util.stream.Collectors;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
/** The IT case for Flink union data in lake and fluss for log table. */
@@ -242,6 +249,90 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
jobClient.cancel().get();
}
+ @Test
+ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "expired_partition_logTable";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ List<Row> writtenRows = new ArrayList<>();
+ long tableId = prepareLogTable(tablePath, DEFAULT_BUCKET_NUM, true,
writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true);
+
+ // Get all partitions
+ Map<Long, String> partitionNameByIds = waitUntilPartitions(tablePath);
+ assertThat(partitionNameByIds.size()).isGreaterThan(0);
+
+ // Select one partition to drop (expire in Fluss)
+ Long partitionToDropId = partitionNameByIds.keySet().iterator().next();
+ String partitionToDropName = partitionNameByIds.get(partitionToDropId);
+
+ // Filter rows that belong to the partition to be dropped
+ List<Row> rowsInExpiredPartition =
+ writtenRows.stream()
+ .filter(row ->
partitionToDropName.equals(row.getField(15)))
+ .collect(Collectors.toList());
+ assertThat(rowsInExpiredPartition).isNotEmpty();
+
+ // Now drop the partition in Fluss (make it expired)
+ // The partition data still exists in Paimon
+ admin.dropPartition(
+ tablePath,
+ new PartitionSpec(Collections.singletonMap("p",
partitionToDropName)),
+ false)
+ .get();
+
+ // Retry until partition dropped
+ retry(
+ Duration.ofSeconds(60),
+ () -> {
+ List<PartitionInfo> remainingPartitions =
+ admin.listPartitionInfos(tablePath).get();
+ assertThat(remainingPartitions.size()).isEqualTo(1);
+ });
+
+ // Now query the table - it should read data from both:
+ // 1. Remaining partitions from Fluss
+ // 2. Expired partition from Paimon (union read)
+ CloseableIterator<Row> iterator =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ List<String> actual = collectRowsWithTimeout(iterator,
writtenRows.size(), true);
+ assertThat(actual)
+ .containsExactlyInAnyOrderElementsOf(
+
writtenRows.stream().map(Row::toString).collect(Collectors.toList()));
+
+ // Test partition filter - query only the expired partition
+ String sqlWithPartitionFilter =
+ "select"
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */"
+ + " * FROM "
+ + tableName
+ + " WHERE p = '"
+ + partitionToDropName
+ + "'";
+ iterator = streamTEnv.executeSql(sqlWithPartitionFilter).collect();
+ List<String> filteredActual =
+ collectRowsWithTimeout(iterator,
rowsInExpiredPartition.size(), true);
+
+ // Should still be able to read data from expired partition via Paimon
+ assertThat(filteredActual)
+ .as("Should read expired partition data from Paimon when
filtering by partition")
+ .containsExactlyInAnyOrderElementsOf(
+ rowsInExpiredPartition.stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()));
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+ }
+
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 5baa4868f..7c8c2d698 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -19,6 +19,8 @@ package org.apache.fluss.lake.paimon.flink;
import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
@@ -62,7 +64,9 @@ import java.util.stream.Collectors;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
/** The IT case for Flink union data in lake and fluss for primary key table.
*/
@@ -763,6 +767,109 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
jobClient.cancel().get();
}
+ @Test
+ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "expired_partition_pk_table";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ // create table and write data
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Function<String, List<InternalRow>> rowGenerator =
+ (partition) ->
+ Arrays.asList(
+ row(3, "string", partition), row(30,
"another_string", partition));
+ long tableId =
+ prepareSimplePKTable(
+ tablePath, DEFAULT_BUCKET_NUM, true, rowGenerator,
bucketLogEndOffset);
+
+ // wait until records has been synced
+ waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true);
+
+ // Get all partitions
+ Map<Long, String> partitionNameByIds = waitUntilPartitions(tablePath);
+ assertThat(partitionNameByIds.size()).isGreaterThan(0);
+
+ // Build expected rows for all partitions
+ // Simple PK table has 3 columns: c1 (INT), c2 (STRING), c3 (STRING)
where c3 is partition
+ List<Row> expectedAllRows = new ArrayList<>();
+ for (String partition : partitionNameByIds.values()) {
+ expectedAllRows.add(Row.of(3, "string", partition));
+ expectedAllRows.add(Row.of(30, "another_string", partition));
+ }
+
+ // Select one partition to drop (expire in Fluss)
+ Long partitionToDropId = partitionNameByIds.keySet().iterator().next();
+ String partitionToDropName = partitionNameByIds.get(partitionToDropId);
+
+ // Filter rows that belong to the partition to be dropped
+ // c3 is the partition column (index 2)
+ List<Row> rowsInExpiredPartition =
+ expectedAllRows.stream()
+ .filter(row ->
partitionToDropName.equals(row.getField(2)))
+ .collect(Collectors.toList());
+ assertThat(rowsInExpiredPartition).isNotEmpty();
+
+ // Now drop the partition in Fluss (make it expired)
+ // The partition data still exists in Paimon
+ // c3 is the partition column for simple PK table
+ admin.dropPartition(
+ tablePath,
+ new PartitionSpec(Collections.singletonMap("c3",
partitionToDropName)),
+ false)
+ .get();
+
+ // Retry until partition dropped
+ retry(
+ Duration.ofSeconds(60),
+ () -> {
+ List<PartitionInfo> remainingPartitions =
+ admin.listPartitionInfos(tablePath).get();
+ assertThat(remainingPartitions.size()).isEqualTo(1);
+ });
+
+ // Now query the table - it should read data from both:
+ // 1. Remaining partitions from Fluss
+ // 2. Expired partition from Paimon (union read)
+ CloseableIterator<Row> iterator =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ List<String> actual = collectRowsWithTimeout(iterator,
expectedAllRows.size(), true);
+ assertThat(actual)
+ .containsExactlyInAnyOrderElementsOf(
+
expectedAllRows.stream().map(Row::toString).collect(Collectors.toList()));
+
+ // Test partition filter - query only the expired partition
+ // c3 is the partition column for simple PK table
+ String sqlWithPartitionFilter =
+ "select"
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */"
+ + " * FROM "
+ + tableName
+ + " WHERE c3 = '"
+ + partitionToDropName
+ + "'";
+ iterator = streamTEnv.executeSql(sqlWithPartitionFilter).collect();
+ List<String> filteredActual =
+ collectRowsWithTimeout(iterator,
rowsInExpiredPartition.size(), true);
+
+ // Should still be able to read data from expired partition via Paimon
+ assertThat(filteredActual)
+ .as("Should read expired partition data from Paimon when
filtering by partition")
+ .containsExactlyInAnyOrderElementsOf(
+ rowsInExpiredPartition.stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()));
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+ }
+
private List<Row> sortedRows(List<Row> rows) {
rows.sort(Comparator.comparing(Row::toString));
return rows;