luoyuxia commented on code in PR #2197:
URL: https://github.com/apache/fluss/pull/2197#discussion_r2629685613
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -83,7 +84,8 @@
* <p>The enumerator is responsible for:
*
* <ul>
- * <li>Get the all splits(snapshot split + log split) for a table of Fluss
to be read.
+ * <li>Get the all splits(lake split + snapshot split + log split) for a
table of Fluss to be
Review Comment:
```suggestion
* <li>Get the all splits(lake split + kv snapshot split + log split) for
a table of Fluss to be
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -664,9 +677,16 @@ private void handlePartitionsRemoved(Collection<Partition>
removedPartitionInfo)
pendingSplitAssignment.forEach(
(reader, splits) ->
splits.removeIf(
- split ->
- removedPartitionsMap.containsKey(
-
split.getTableBucket().getPartitionId())));
+ split -> {
+ // Never remove LakeSnapshotSplit, because
during union reads,
+ // data from the lake partition must still
be read even if the
+ // partition has already expired in Fluss.
+ if (split instanceof LakeSnapshotSplit) {
Review Comment:
What if `LakeSnapshotAndFlussLogSplit`? Such kind of split will also be
removed.
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -763,6 +767,134 @@ void testUnionReadPrimaryKeyTableFailover(boolean
isPartitioned) throws Exceptio
jobClient.cancel().get();
}
+ @Test
+ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "expired_partition_pkTable";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(tablePath, DEFAULT_BUCKET_NUM, true,
bucketLogEndOffset);
Review Comment:
I think we can just use `prepareSimplePKTable` to simiplify the test instead
of all datatypes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]