This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3815e9eba [paimon] Fix union read paimon issue (#2170)
3815e9eba is described below

commit 3815e9ebac85fa99ff64cd7d18e40c4a54363cd7
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Dec 17 17:37:24 2025 +0800

    [paimon] Fix union read paimon issue (#2170)
---
 .../fluss/flink/lake/LakeSplitReaderGenerator.java | 12 ++++-
 .../reader/LakeSnapshotAndLogSplitScanner.java     | 33 +++++++------
 .../fluss/flink/lake/reader/SortMergeReader.java   | 30 ++++++++++--
 .../source/enumerator/FlinkSourceEnumerator.java   | 14 ++++--
 .../flink/source/state/SourceEnumeratorState.java  |  2 +
 .../lake/paimon/source/PaimonRecordReader.java     | 20 ++++----
 .../paimon/source/PaimonSortedRecordReader.java    |  8 ++--
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 56 ++++++++++++++++++++--
 .../source/PaimonSortedRecordReaderTest.java       | 11 ++++-
 .../testutils/FlinkPaimonTieringTestBase.java      | 39 +++++++--------
 10 files changed, 162 insertions(+), 63 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 39b4e08d4..d48dbb794 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -55,8 +55,16 @@ public class LakeSplitReaderGenerator {
         if (split instanceof LakeSnapshotSplit) {
             boundedSplits.add(split);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
-            // lake split not finished, add to it
-            if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished()) 
{
+            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+                    (LakeSnapshotAndFlussLogSplit) split;
+            boolean isStreaming = ((LakeSnapshotAndFlussLogSplit) 
split).isStreaming();
+            // if is streaming and lake split not finished, add to it
+            if (isStreaming) {
+                if (!lakeSnapshotAndFlussLogSplit.isLakeSplitFinished()) {
+                    boundedSplits.add(split);
+                }
+            } else {
+                // otherwise, in batch mode, always add it
                 boundedSplits.add(split);
             }
         } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
index e04256911..9d3ce7379 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -62,7 +61,6 @@ public class LakeSnapshotAndLogSplitScanner implements 
BatchScanner {
     // the indexes of primary key in emitted row by paimon and fluss
     private int[] keyIndexesInRow;
     @Nullable private int[] adjustProjectedFields;
-    private final int[] newProjectedFields;
 
     // the sorted logs in memory, mapping from key -> value
     private Map<InternalRow, KeyValueRow> logRows;
@@ -81,7 +79,7 @@ public class LakeSnapshotAndLogSplitScanner implements 
BatchScanner {
         this.pkIndexes = 
table.getTableInfo().getSchema().getPrimaryKeyIndexes();
         this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
         this.lakeSource = lakeSource;
-        this.newProjectedFields = getNeedProjectFields(table, projectedFields);
+        int[] newProjectedFields = getNeedProjectFields(table, 
projectedFields);
 
         this.logScanner = 
table.newScan().project(newProjectedFields).createLogScanner();
         this.lakeSource.withProject(
@@ -109,7 +107,9 @@ public class LakeSnapshotAndLogSplitScanner implements 
BatchScanner {
                                                 "StoppingOffset is null for 
split: "
                                                         + 
lakeSnapshotAndFlussLogSplit));
 
-        this.logScanFinished = 
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+        this.logScanFinished =
+                lakeSnapshotAndFlussLogSplit.getStartingOffset() >= 
stoppingOffset
+                        || stoppingOffset <= 0;
     }
 
     private int[] getNeedProjectFields(Table flussTable, @Nullable int[] 
projectedFields) {
@@ -192,23 +192,26 @@ public class LakeSnapshotAndLogSplitScanner implements 
BatchScanner {
             return currentSortMergeReader.readBatch();
         } else {
             if (lakeRecordIterators.isEmpty()) {
+                List<RecordReader> recordReaders = new ArrayList<>();
                 if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
                         || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
-                    lakeRecordIterators = Collections.emptyList();
-                    logRows = new LinkedHashMap<>();
+                    // pass null split to get rowComparator
+                    recordReaders.add(lakeSource.createRecordReader(() -> 
null));
                 } else {
                     for (LakeSplit lakeSplit : 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
-                        RecordReader reader = lakeSource.createRecordReader(() 
-> lakeSplit);
-                        if (reader instanceof SortedRecordReader) {
-                            rowComparator = ((SortedRecordReader) 
reader).order();
-                        } else {
-                            throw new UnsupportedOperationException(
-                                    "lake records must instance of sorted 
view.");
-                        }
-                        lakeRecordIterators.add(reader.read());
+                        recordReaders.add(lakeSource.createRecordReader(() -> 
lakeSplit));
+                    }
+                }
+                for (RecordReader reader : recordReaders) {
+                    if (reader instanceof SortedRecordReader) {
+                        rowComparator = ((SortedRecordReader) reader).order();
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "lake records must instance of sorted view.");
                     }
-                    logRows = new TreeMap<>(rowComparator);
+                    lakeRecordIterators.add(reader.read());
                 }
+                logRows = new TreeMap<>(rowComparator);
             }
             pollLogRecords(timeout);
             return CloseableIterator.wrap(Collections.emptyIterator());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
index 6e490108f..cf9d22925 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
@@ -183,7 +183,10 @@ class SortMergeReader {
             // we should emit the log record firsts; and still need to 
iterator changelog to find
             // the first change log greater than the snapshot record
             List<InternalRow> emitRows = new ArrayList<>();
-            emitRows.add(logKeyValueRow.valueRow());
+            // only emit the log record if it's not a delete operation
+            if (!logKeyValueRow.isDelete()) {
+                emitRows.add(logKeyValueRow.valueRow());
+            }
             boolean shouldEmitSnapshotRecord = true;
             while (changeLogIterator.hasNext()) {
                 // get the next log record
@@ -203,7 +206,9 @@ class SortMergeReader {
                 } else if (compareResult > 0) {
                     // snapshot record > the log record
                     // the log record should be emitted
-                    emitRows.add(logKeyValueRow.valueRow());
+                    if (!logKeyValueRow.isDelete()) {
+                        emitRows.add(logKeyValueRow.valueRow());
+                    }
                 } else {
                     // log record == snapshot record
                     // the log record should be emitted if is not delete, but 
the snapshot record
@@ -282,12 +287,14 @@ class SortMergeReader {
 
     private static class ChangeLogIteratorWrapper implements 
CloseableIterator<InternalRow> {
         private CloseableIterator<KeyValueRow> changeLogRecordIterator;
+        private KeyValueRow nextReturnRow;
 
         public ChangeLogIteratorWrapper() {}
 
         public ChangeLogIteratorWrapper replace(
                 CloseableIterator<KeyValueRow> changeLogRecordIterator) {
             this.changeLogRecordIterator = changeLogRecordIterator;
+            this.nextReturnRow = null;
             return this;
         }
 
@@ -300,12 +307,27 @@ class SortMergeReader {
 
         @Override
         public boolean hasNext() {
-            return changeLogRecordIterator != null && 
changeLogRecordIterator.hasNext();
+            if (nextReturnRow != null) {
+                return true;
+            }
+            while (changeLogRecordIterator != null && 
changeLogRecordIterator.hasNext()) {
+                KeyValueRow row = changeLogRecordIterator.next();
+                if (!row.isDelete()) {
+                    nextReturnRow = row;
+                    return true;
+                }
+            }
+            return false;
         }
 
         @Override
         public InternalRow next() {
-            return changeLogRecordIterator.next().valueRow();
+            if (nextReturnRow == null) {
+                throw new NoSuchElementException();
+            }
+            KeyValueRow row = nextReturnRow;
+            nextReturnRow = null; // Clear cache after consuming
+            return row.valueRow();
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 86815a244..a569fb144 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -628,8 +628,16 @@ public class FlinkSourceEnumerator
                             stoppingOffsetsInitializer,
                             tableInfo.getNumBuckets(),
                             this::listPartitions);
-            pendingHybridLakeFlussSplits = 
lakeSplitGenerator.generateHybridLakeFlussSplits();
-            return pendingHybridLakeFlussSplits;
+            List<SourceSplitBase> generatedSplits =
+                    lakeSplitGenerator.generateHybridLakeFlussSplits();
+            if (generatedSplits == null) {
+                // no hybrid lake splits, set the pending splits to empty list
+                pendingHybridLakeFlussSplits = Collections.emptyList();
+                return null;
+            } else {
+                pendingHybridLakeFlussSplits = generatedSplits;
+                return generatedSplits;
+            }
         } catch (Exception e) {
             throw new FlinkRuntimeException("Failed to generate hybrid lake 
fluss splits", e);
         }
@@ -680,6 +688,7 @@ public class FlinkSourceEnumerator
                         t);
             }
         }
+        doHandleSplitsAdd(splits);
         if (isPartitioned) {
             if (!streaming || scanPartitionDiscoveryIntervalMs <= 0) {
                 // if not streaming or partition discovery is disabled
@@ -691,7 +700,6 @@ public class FlinkSourceEnumerator
             // so, noMoreNewPartitionSplits should be set to true
             noMoreNewSplits = true;
         }
-        doHandleSplitsAdd(splits);
     }
 
     private void doHandleSplitsAdd(List<SourceSplitBase> splits) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index d804d069b..7a677df20 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -88,6 +88,8 @@ public class SourceEnumeratorState {
                 + assignedBuckets
                 + ", assignedPartitions="
                 + assignedPartitions
+                + ", remainingHybridLakeFlussSplits="
+                + remainingHybridLakeFlussSplits
                 + '}';
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
index 9d0630a58..3af7467cf 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -48,12 +48,11 @@ public class PaimonRecordReader implements RecordReader {
 
     protected PaimonRowAsFlussRecordIterator iterator;
     protected @Nullable int[][] project;
-    protected @Nullable Predicate predicate;
     protected RowType paimonRowType;
 
     public PaimonRecordReader(
             FileStoreTable fileStoreTable,
-            PaimonSplit split,
+            @Nullable PaimonSplit split,
             @Nullable int[][] project,
             @Nullable Predicate predicate)
             throws IOException {
@@ -69,12 +68,17 @@ public class PaimonRecordReader implements RecordReader {
 
         TableRead tableRead = readBuilder.newRead().executeFilter();
         paimonRowType = readBuilder.readType();
-
-        org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
-                tableRead.createReader(split.dataSplit());
-        iterator =
-                new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
-                        recordReader.toCloseableIterator(), paimonRowType);
+        if (split == null) {
+            iterator =
+                    new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+                            org.apache.paimon.utils.CloseableIterator.empty(), 
paimonRowType);
+        } else {
+            org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+                    tableRead.createReader(split.dataSplit());
+            iterator =
+                    new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+                            recordReader.toCloseableIterator(), paimonRowType);
+        }
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 467c4435b..1940b2055 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -38,16 +38,16 @@ public class PaimonSortedRecordReader extends 
PaimonRecordReader implements Sort
 
     public PaimonSortedRecordReader(
             FileStoreTable fileStoreTable,
-            PaimonSplit split,
+            // a temporary fix to pass null split to get the order comparator
+            @Nullable PaimonSplit split,
             @Nullable int[][] project,
             @Nullable Predicate predicate)
             throws IOException {
         super(fileStoreTable, split, project, predicate);
         RowType pkKeyType =
                 new RowType(
-                        
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
-                                fileStoreTable.schema()));
-
+                        PrimaryKeyTableUtils.addKeyNamePrefix(
+                                fileStoreTable.schema().primaryKeysFields()));
         this.comparator =
                 toFlussRowComparator(paimonRowType, new 
KeyComparatorSupplier(pkKeyType).get());
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a7dce3836..5baa4868f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -39,6 +41,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -89,7 +92,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
 
         // check the status of replica after synced
-        assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+        assertReplicaStatus(bucketLogEndOffset);
 
         // will read paimon snapshot, won't merge log since it's empty
         List<String> resultEmptyLog =
@@ -380,6 +383,51 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         
assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString());
     }
 
+    @Test
+    void testUnionReadWhenSomeBucketNotTiered() throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "pk_table_union_read_some_bucket_not_tiered";
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        int bucketNum = 3;
+        // create table & write initial data
+        long tableId = createSimplePkTable(t1, bucketNum, false, true);
+
+        writeRows(
+                t1,
+                Arrays.asList(
+                        GenericRow.of(
+                                1, BinaryString.fromString("v11"), 
BinaryString.fromString("v12")),
+                        GenericRow.of(
+                                2, BinaryString.fromString("v21"), 
BinaryString.fromString("v22"))),
+                false);
+
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L);
+
+        // wait unit records have been synced
+        waitUntilBucketsSynced(bucketLogEndOffset.keySet());
+
+        // check the status of replica after synced
+        assertReplicaStatus(bucketLogEndOffset);
+
+        jobClient.cancel().get();
+        writeRows(
+                t1,
+                Arrays.asList(
+                        GenericRow.of(
+                                0, BinaryString.fromString("v01"), 
BinaryString.fromString("v02")),
+                        GenericRow.of(
+                                3, BinaryString.fromString("v31"), 
BinaryString.fromString("v32"))),
+                false);
+
+        List<String> result = toSortedRows(batchTEnv.executeSql("select * from 
" + tableName));
+        assertThat(result.toString())
+                .isEqualTo("[+I[0, v01, v02], +I[1, v11, v12], +I[2, v21, 
v22], +I[3, v31, v32]]");
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
@@ -398,7 +446,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
 
         // check the status of replica after synced
-        assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+        assertReplicaStatus(bucketLogEndOffset);
 
         // will read paimon snapshot, should only +I since no change log
         List<Row> expectedRows = new ArrayList<>();
@@ -628,7 +676,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
                         bucketLogEndOffset);
 
         // check the status of replica after synced
-        assertReplicaStatus(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned, bucketLogEndOffset);
+        assertReplicaStatus(bucketLogEndOffset);
 
         // create result table
         createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
@@ -895,7 +943,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         if (isPartitioned) {
             tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
             tableBuilder.partitionedBy(partitionKeys);
-            schemaBuilder.primaryKey(primaryKey, partitionKeys);
+            schemaBuilder.primaryKey(partitionKeys, primaryKey);
             tableBuilder.property(
                     ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
         } else {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
index 49749bf4e..21a6333fd 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.utils.CloseableIterator;
@@ -70,6 +71,9 @@ class PaimonSortedRecordReaderTest extends 
PaimonSourceTestBase {
 
         LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
         Table table = getTable(tablePath);
+
+        int[] pkIndex = table.rowType().getFieldIndices(table.primaryKeys());
+        ProjectedRow projectedPkRow = ProjectedRow.from(pkIndex);
         Snapshot snapshot = table.latestSnapshot().get();
         List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
 
@@ -77,10 +81,15 @@ class PaimonSortedRecordReaderTest extends 
PaimonSourceTestBase {
             RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
             
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
             CloseableIterator<LogRecord> iterator = recordReader.read();
+
             assertThat(
                             isSorted(
                                     TransformingCloseableIterator.transform(
-                                            iterator, LogRecord::getRow),
+                                            iterator,
+                                            record -> {
+                                                
projectedPkRow.replaceRow(record.getRow());
+                                                return projectedPkRow;
+                                            }),
                                     ((SortedRecordReader) 
recordReader).order()))
                     .isTrue();
             iterator.close();
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 2580370d8..5b86e6a8f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -61,10 +61,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
 import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -386,26 +388,11 @@ public abstract class FlinkPaimonTieringTestBase {
         return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
 
-    protected void assertReplicaStatus(
-            TablePath tablePath,
-            long tableId,
-            int bucketCount,
-            boolean isPartitioned,
-            Map<TableBucket, Long> expectedLogEndOffset) {
-        if (isPartitioned) {
-            Map<Long, String> partitionById =
-                    
waitUntilPartitions(getFlussClusterExtension().getZooKeeperClient(), tablePath);
-            for (Long partitionId : partitionById.keySet()) {
-                for (int i = 0; i < bucketCount; i++) {
-                    TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
-                    assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
-                }
-            }
-        } else {
-            for (int i = 0; i < bucketCount; i++) {
-                TableBucket tableBucket = new TableBucket(tableId, i);
-                assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
-            }
+    protected void assertReplicaStatus(Map<TableBucket, Long> 
expectedLogEndOffset) {
+        for (Map.Entry<TableBucket, Long> expectedLogEndOffsetEntry :
+                expectedLogEndOffset.entrySet()) {
+            assertReplicaStatus(
+                    expectedLogEndOffsetEntry.getKey(), 
expectedLogEndOffsetEntry.getValue());
         }
     }
 
@@ -423,20 +410,28 @@ public abstract class FlinkPaimonTieringTestBase {
 
     protected void waitUntilBucketSynced(
             TablePath tablePath, long tableId, int bucketCount, boolean 
isPartition) {
+        Set<TableBucket> tableBuckets = new HashSet<>();
         if (isPartition) {
             Map<Long, String> partitionById = waitUntilPartitions(tablePath);
             for (Long partitionId : partitionById.keySet()) {
                 for (int i = 0; i < bucketCount; i++) {
                     TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
-                    waitUntilBucketSynced(tableBucket);
+                    tableBuckets.add(tableBucket);
                 }
             }
         } else {
             for (int i = 0; i < bucketCount; i++) {
                 TableBucket tableBucket = new TableBucket(tableId, i);
-                waitUntilBucketSynced(tableBucket);
+                tableBuckets.add(tableBucket);
             }
         }
+        waitUntilBucketsSynced(tableBuckets);
+    }
+
+    protected void waitUntilBucketsSynced(Set<TableBucket> tableBuckets) {
+        for (TableBucket tableBucket : tableBuckets) {
+            waitUntilBucketSynced(tableBucket);
+        }
     }
 
     protected void waitUntilBucketSynced(TableBucket tb) {

Reply via email to