luoyuxia commented on code in PR #2197:
URL: https://github.com/apache/fluss/pull/2197#discussion_r2629685613


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -83,7 +84,8 @@
  * <p>The enumerator is responsible for:
  *
  * <ul>
- *   <li>Get the all splits(snapshot split + log split) for a table of Fluss 
to be read.
+ *   <li>Get the all splits(lake split + snapshot split + log split) for a 
table of Fluss to be

Review Comment:
   ```suggestion
    *   <li>Get the all splits(lake split + kv snapshot split + log split) for 
a table of Fluss to be
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -664,9 +677,16 @@ private void handlePartitionsRemoved(Collection<Partition> 
removedPartitionInfo)
         pendingSplitAssignment.forEach(
                 (reader, splits) ->
                         splits.removeIf(
-                                split ->
-                                        removedPartitionsMap.containsKey(
-                                                
split.getTableBucket().getPartitionId())));
+                                split -> {
+                                    // Never remove LakeSnapshotSplit, because 
during union reads,
+                                    // data from the lake partition must still 
be read even if the
+                                    // partition has already expired in Fluss.
+                                    if (split instanceof LakeSnapshotSplit) {

Review Comment:
   What if `LakeSnapshotAndFlussLogSplit`? Such kind of split will also be 
removed. 



##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -763,6 +767,134 @@ void testUnionReadPrimaryKeyTableFailover(boolean 
isPartitioned) throws Exceptio
         jobClient.cancel().get();
     }
 
+    @Test
+    void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "expired_partition_pkTable";
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(tablePath, DEFAULT_BUCKET_NUM, true, 
bucketLogEndOffset);

Review Comment:
   I think we can just use `prepareSimplePKTable` to simiplify the test instead 
of all datatypes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to