luoyuxia commented on code in PR #2265:
URL: https://github.com/apache/fluss/pull/2265#discussion_r2674903726


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -186,6 +186,9 @@ void testLogTableCompaction() throws Exception {
                             t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
             checkFileStatusInIcebergTable(t1, 3, false);
 
+            // Ensure tiering job has fully processed the previous writes
+            assertReplicaStatus(t1Bucket, i);

Review Comment:
   Thanks @rionmonster for diving into it and sorry for late reply for i have a 
busy week. I download the log for the fail ci in 
https://github.com/apache/fluss/actions/runs/20522062759/artifacts/4970768099
   And I found the following logs:
   ```
   --------- first tiering
   20:09:10,300 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
Generate Tiering splits for table fluss.log_table.
   20:09:10,310 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
**Generate Tiering 1** splits for table fluss.log_table with cost 10ms.
   20:09:10,310 [Source: TieringSource (1/2)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, 
tableBucket=TableBucket{tableId=2, bucket=0}, 
   
   ---- second  tiering
   org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] 
- Generate Tiering splits for table fluss.log_table.
   20:09:11,303 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator [] - Last 
committed lake table snapshot info 
is:LakeSnapshot{snapshotId=3828565410144101634, 
tableBucketsOffset={TableBucket{tableId=2, bucket=0}=1}}
   20:09:11,307 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
Generate Tiering 1 splits for table fluss.log_table with cost 7ms.
   20:09:11,307 [Source: TieringSource (1/2)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, 
tableBucket=TableBucket{tableId=2, bucket=0}, partitionName='null', 
startingOffset=1, stoppingOffset=2, numberOfSplits=1}]
   20:09:11,307 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 1
   20:09:11,307 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - add split 
tiering-log-split-2-0
   20:09:11,310 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Start to tier 
table fluss.log_table with table id 2.
   20:09:11,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Subscribe to 
read log for split tiering-log-split-2-0 from starting offset 1 to end offset 2.
   
   ---- third  tiering
   
   org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] 
- Generate Tiering splits for table fluss.log_table.
   20:09:12,303 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator [] - Last 
committed lake table snapshot info 
is:LakeSnapshot{snapshotId=7991116837264656570, 
tableBucketsOffset={TableBucket{tableId=2, bucket=0}=2}}
   20:09:12,307 [SourceCoordinator-Source: TieringSource] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
Generate Tiering 1 splits for table fluss.log_table with cost 7ms.
   20:09:12,308 [Source: TieringSource (1/2)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, 
tableBucket=TableBucket{tableId=2, bucket=0}, partitionName='null', 
startingOffset=2, stoppingOffset=3, numberOfSplits=1}]
   20:09:12,308 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 2
   20:09:12,308 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - add split 
tiering-log-split-2-0
   20:09:12,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Start to tier 
table fluss.log_table with table id 2.
   20:09:12,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  
org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Subscribe to 
read log for split tiering-log-split-2-0 from starting offset 2 to end offset 3.
   
   
   --- then no tiering again, that's where the problem happen
   
   20:09:14,300 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
No available Tiering table found, will poll later.
   20:09:14,800 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
No available Tiering table found, will poll later.
   ....
   
   20:10:19,800 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - 
No available Tiering table found, will poll later.
   
   **no any tiering happen**
   ```
   
   I think the keypoint is that why after the third tiering which tiering up to 
offset 3(3 records), no tiering happen. 
   It's expected to happen another tiering since we write the 4th record in 
   ```
    flussRows.addAll(
                       writeIcebergTableRecords(
                               t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
   ```
   
   It'll be better that we figure out the reason. Since you can reproduce it in 
your local, I think you can print more logs  in `LakeTableTieringManager` to 
find the it doesn't assign the table to tiering service anymore. 
   
   I'm afarid that there is some bug in `LakeTableTieringManager` or the logic 
of assign the table to tier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to