This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3815e9eba [paimon] Fix union read paimon issue (#2170)
3815e9eba is described below
commit 3815e9ebac85fa99ff64cd7d18e40c4a54363cd7
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Dec 17 17:37:24 2025 +0800
[paimon] Fix union read paimon issue (#2170)
---
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 12 ++++-
.../reader/LakeSnapshotAndLogSplitScanner.java | 33 +++++++------
.../fluss/flink/lake/reader/SortMergeReader.java | 30 ++++++++++--
.../source/enumerator/FlinkSourceEnumerator.java | 14 ++++--
.../flink/source/state/SourceEnumeratorState.java | 2 +
.../lake/paimon/source/PaimonRecordReader.java | 20 ++++----
.../paimon/source/PaimonSortedRecordReader.java | 8 ++--
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 56 ++++++++++++++++++++--
.../source/PaimonSortedRecordReaderTest.java | 11 ++++-
.../testutils/FlinkPaimonTieringTestBase.java | 39 +++++++--------
10 files changed, 162 insertions(+), 63 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 39b4e08d4..d48dbb794 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -55,8 +55,16 @@ public class LakeSplitReaderGenerator {
if (split instanceof LakeSnapshotSplit) {
boundedSplits.add(split);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
- // lake split not finished, add to it
- if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished())
{
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+ (LakeSnapshotAndFlussLogSplit) split;
+ boolean isStreaming = ((LakeSnapshotAndFlussLogSplit)
split).isStreaming();
+ // if is streaming and lake split not finished, add to it
+ if (isStreaming) {
+ if (!lakeSnapshotAndFlussLogSplit.isLakeSplitFinished()) {
+ boundedSplits.add(split);
+ }
+ } else {
+ // otherwise, in batch mode, always add it
boundedSplits.add(split);
}
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
index e04256911..9d3ce7379 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -62,7 +61,6 @@ public class LakeSnapshotAndLogSplitScanner implements
BatchScanner {
// the indexes of primary key in emitted row by paimon and fluss
private int[] keyIndexesInRow;
@Nullable private int[] adjustProjectedFields;
- private final int[] newProjectedFields;
// the sorted logs in memory, mapping from key -> value
private Map<InternalRow, KeyValueRow> logRows;
@@ -81,7 +79,7 @@ public class LakeSnapshotAndLogSplitScanner implements
BatchScanner {
this.pkIndexes =
table.getTableInfo().getSchema().getPrimaryKeyIndexes();
this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
this.lakeSource = lakeSource;
- this.newProjectedFields = getNeedProjectFields(table, projectedFields);
+ int[] newProjectedFields = getNeedProjectFields(table,
projectedFields);
this.logScanner =
table.newScan().project(newProjectedFields).createLogScanner();
this.lakeSource.withProject(
@@ -109,7 +107,9 @@ public class LakeSnapshotAndLogSplitScanner implements
BatchScanner {
"StoppingOffset is null for
split: "
+
lakeSnapshotAndFlussLogSplit));
- this.logScanFinished =
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+ this.logScanFinished =
+ lakeSnapshotAndFlussLogSplit.getStartingOffset() >=
stoppingOffset
+ || stoppingOffset <= 0;
}
private int[] getNeedProjectFields(Table flussTable, @Nullable int[]
projectedFields) {
@@ -192,23 +192,26 @@ public class LakeSnapshotAndLogSplitScanner implements
BatchScanner {
return currentSortMergeReader.readBatch();
} else {
if (lakeRecordIterators.isEmpty()) {
+ List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
- lakeRecordIterators = Collections.emptyList();
- logRows = new LinkedHashMap<>();
+ // pass null split to get rowComparator
+ recordReaders.add(lakeSource.createRecordReader(() ->
null));
} else {
for (LakeSplit lakeSplit :
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
- RecordReader reader = lakeSource.createRecordReader(()
-> lakeSplit);
- if (reader instanceof SortedRecordReader) {
- rowComparator = ((SortedRecordReader)
reader).order();
- } else {
- throw new UnsupportedOperationException(
- "lake records must instance of sorted
view.");
- }
- lakeRecordIterators.add(reader.read());
+ recordReaders.add(lakeSource.createRecordReader(() ->
lakeSplit));
+ }
+ }
+ for (RecordReader reader : recordReaders) {
+ if (reader instanceof SortedRecordReader) {
+ rowComparator = ((SortedRecordReader) reader).order();
+ } else {
+ throw new UnsupportedOperationException(
+ "lake records must instance of sorted view.");
}
- logRows = new TreeMap<>(rowComparator);
+ lakeRecordIterators.add(reader.read());
}
+ logRows = new TreeMap<>(rowComparator);
}
pollLogRecords(timeout);
return CloseableIterator.wrap(Collections.emptyIterator());
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
index 6e490108f..cf9d22925 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
@@ -183,7 +183,10 @@ class SortMergeReader {
// we should emit the log record firsts; and still need to
iterator changelog to find
// the first change log greater than the snapshot record
List<InternalRow> emitRows = new ArrayList<>();
- emitRows.add(logKeyValueRow.valueRow());
+ // only emit the log record if it's not a delete operation
+ if (!logKeyValueRow.isDelete()) {
+ emitRows.add(logKeyValueRow.valueRow());
+ }
boolean shouldEmitSnapshotRecord = true;
while (changeLogIterator.hasNext()) {
// get the next log record
@@ -203,7 +206,9 @@ class SortMergeReader {
} else if (compareResult > 0) {
// snapshot record > the log record
// the log record should be emitted
- emitRows.add(logKeyValueRow.valueRow());
+ if (!logKeyValueRow.isDelete()) {
+ emitRows.add(logKeyValueRow.valueRow());
+ }
} else {
// log record == snapshot record
// the log record should be emitted if is not delete, but
the snapshot record
@@ -282,12 +287,14 @@ class SortMergeReader {
private static class ChangeLogIteratorWrapper implements
CloseableIterator<InternalRow> {
private CloseableIterator<KeyValueRow> changeLogRecordIterator;
+ private KeyValueRow nextReturnRow;
public ChangeLogIteratorWrapper() {}
public ChangeLogIteratorWrapper replace(
CloseableIterator<KeyValueRow> changeLogRecordIterator) {
this.changeLogRecordIterator = changeLogRecordIterator;
+ this.nextReturnRow = null;
return this;
}
@@ -300,12 +307,27 @@ class SortMergeReader {
@Override
public boolean hasNext() {
- return changeLogRecordIterator != null &&
changeLogRecordIterator.hasNext();
+ if (nextReturnRow != null) {
+ return true;
+ }
+ while (changeLogRecordIterator != null &&
changeLogRecordIterator.hasNext()) {
+ KeyValueRow row = changeLogRecordIterator.next();
+ if (!row.isDelete()) {
+ nextReturnRow = row;
+ return true;
+ }
+ }
+ return false;
}
@Override
public InternalRow next() {
- return changeLogRecordIterator.next().valueRow();
+ if (nextReturnRow == null) {
+ throw new NoSuchElementException();
+ }
+ KeyValueRow row = nextReturnRow;
+ nextReturnRow = null; // Clear cache after consuming
+ return row.valueRow();
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 86815a244..a569fb144 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -628,8 +628,16 @@ public class FlinkSourceEnumerator
stoppingOffsetsInitializer,
tableInfo.getNumBuckets(),
this::listPartitions);
- pendingHybridLakeFlussSplits =
lakeSplitGenerator.generateHybridLakeFlussSplits();
- return pendingHybridLakeFlussSplits;
+ List<SourceSplitBase> generatedSplits =
+ lakeSplitGenerator.generateHybridLakeFlussSplits();
+ if (generatedSplits == null) {
+ // no hybrid lake splits, set the pending splits to empty list
+ pendingHybridLakeFlussSplits = Collections.emptyList();
+ return null;
+ } else {
+ pendingHybridLakeFlussSplits = generatedSplits;
+ return generatedSplits;
+ }
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to generate hybrid lake
fluss splits", e);
}
@@ -680,6 +688,7 @@ public class FlinkSourceEnumerator
t);
}
}
+ doHandleSplitsAdd(splits);
if (isPartitioned) {
if (!streaming || scanPartitionDiscoveryIntervalMs <= 0) {
// if not streaming or partition discovery is disabled
@@ -691,7 +700,6 @@ public class FlinkSourceEnumerator
// so, noMoreNewPartitionSplits should be set to true
noMoreNewSplits = true;
}
- doHandleSplitsAdd(splits);
}
private void doHandleSplitsAdd(List<SourceSplitBase> splits) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index d804d069b..7a677df20 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -88,6 +88,8 @@ public class SourceEnumeratorState {
+ assignedBuckets
+ ", assignedPartitions="
+ assignedPartitions
+ + ", remainingHybridLakeFlussSplits="
+ + remainingHybridLakeFlussSplits
+ '}';
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
index 9d0630a58..3af7467cf 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -48,12 +48,11 @@ public class PaimonRecordReader implements RecordReader {
protected PaimonRowAsFlussRecordIterator iterator;
protected @Nullable int[][] project;
- protected @Nullable Predicate predicate;
protected RowType paimonRowType;
public PaimonRecordReader(
FileStoreTable fileStoreTable,
- PaimonSplit split,
+ @Nullable PaimonSplit split,
@Nullable int[][] project,
@Nullable Predicate predicate)
throws IOException {
@@ -69,12 +68,17 @@ public class PaimonRecordReader implements RecordReader {
TableRead tableRead = readBuilder.newRead().executeFilter();
paimonRowType = readBuilder.readType();
-
- org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
- tableRead.createReader(split.dataSplit());
- iterator =
- new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
- recordReader.toCloseableIterator(), paimonRowType);
+ if (split == null) {
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ org.apache.paimon.utils.CloseableIterator.empty(),
paimonRowType);
+ } else {
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 467c4435b..1940b2055 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -38,16 +38,16 @@ public class PaimonSortedRecordReader extends
PaimonRecordReader implements Sort
public PaimonSortedRecordReader(
FileStoreTable fileStoreTable,
- PaimonSplit split,
+ // a temporary fix to pass null split to get the order comparator
+ @Nullable PaimonSplit split,
@Nullable int[][] project,
@Nullable Predicate predicate)
throws IOException {
super(fileStoreTable, split, project, predicate);
RowType pkKeyType =
new RowType(
-
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
- fileStoreTable.schema()));
-
+ PrimaryKeyTableUtils.addKeyNamePrefix(
+ fileStoreTable.schema().primaryKeysFields()));
this.comparator =
toFlussRowComparator(paimonRowType, new
KeyComparatorSupplier(pkKeyType).get());
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a7dce3836..5baa4868f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -23,7 +23,9 @@ import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
@@ -39,6 +41,7 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -89,7 +92,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// check the status of replica after synced
- assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+ assertReplicaStatus(bucketLogEndOffset);
// will read paimon snapshot, won't merge log since it's empty
List<String> resultEmptyLog =
@@ -380,6 +383,51 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString());
}
+ @Test
+ void testUnionReadWhenSomeBucketNotTiered() throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "pk_table_union_read_some_bucket_not_tiered";
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ int bucketNum = 3;
+ // create table & write initial data
+ long tableId = createSimplePkTable(t1, bucketNum, false, true);
+
+ writeRows(
+ t1,
+ Arrays.asList(
+ GenericRow.of(
+ 1, BinaryString.fromString("v11"),
BinaryString.fromString("v12")),
+ GenericRow.of(
+ 2, BinaryString.fromString("v21"),
BinaryString.fromString("v22"))),
+ false);
+
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L);
+ bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L);
+
+ // wait unit records have been synced
+ waitUntilBucketsSynced(bucketLogEndOffset.keySet());
+
+ // check the status of replica after synced
+ assertReplicaStatus(bucketLogEndOffset);
+
+ jobClient.cancel().get();
+ writeRows(
+ t1,
+ Arrays.asList(
+ GenericRow.of(
+ 0, BinaryString.fromString("v01"),
BinaryString.fromString("v02")),
+ GenericRow.of(
+ 3, BinaryString.fromString("v31"),
BinaryString.fromString("v32"))),
+ false);
+
+ List<String> result = toSortedRows(batchTEnv.executeSql("select * from
" + tableName));
+ assertThat(result.toString())
+ .isEqualTo("[+I[0, v01, v02], +I[1, v11, v12], +I[2, v21,
v22], +I[3, v31, v32]]");
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
@@ -398,7 +446,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
// check the status of replica after synced
- assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+ assertReplicaStatus(bucketLogEndOffset);
// will read paimon snapshot, should only +I since no change log
List<Row> expectedRows = new ArrayList<>();
@@ -628,7 +676,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
bucketLogEndOffset);
// check the status of replica after synced
- assertReplicaStatus(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned, bucketLogEndOffset);
+ assertReplicaStatus(bucketLogEndOffset);
// create result table
createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
@@ -895,7 +943,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
if (isPartitioned) {
tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
tableBuilder.partitionedBy(partitionKeys);
- schemaBuilder.primaryKey(primaryKey, partitionKeys);
+ schemaBuilder.primaryKey(partitionKeys, primaryKey);
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
} else {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
index 49749bf4e..21a6333fd 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.utils.CloseableIterator;
@@ -70,6 +71,9 @@ class PaimonSortedRecordReaderTest extends
PaimonSourceTestBase {
LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
Table table = getTable(tablePath);
+
+ int[] pkIndex = table.rowType().getFieldIndices(table.primaryKeys());
+ ProjectedRow projectedPkRow = ProjectedRow.from(pkIndex);
Snapshot snapshot = table.latestSnapshot().get();
List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
@@ -77,10 +81,15 @@ class PaimonSortedRecordReaderTest extends
PaimonSourceTestBase {
RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
CloseableIterator<LogRecord> iterator = recordReader.read();
+
assertThat(
isSorted(
TransformingCloseableIterator.transform(
- iterator, LogRecord::getRow),
+ iterator,
+ record -> {
+
projectedPkRow.replaceRow(record.getRow());
+ return projectedPkRow;
+ }),
((SortedRecordReader)
recordReader).order()))
.isTrue();
iterator.close();
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 2580370d8..5b86e6a8f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -61,10 +61,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -386,26 +388,11 @@ public abstract class FlinkPaimonTieringTestBase {
return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
}
- protected void assertReplicaStatus(
- TablePath tablePath,
- long tableId,
- int bucketCount,
- boolean isPartitioned,
- Map<TableBucket, Long> expectedLogEndOffset) {
- if (isPartitioned) {
- Map<Long, String> partitionById =
-
waitUntilPartitions(getFlussClusterExtension().getZooKeeperClient(), tablePath);
- for (Long partitionId : partitionById.keySet()) {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
- assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
- }
- }
- } else {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
- }
+ protected void assertReplicaStatus(Map<TableBucket, Long>
expectedLogEndOffset) {
+ for (Map.Entry<TableBucket, Long> expectedLogEndOffsetEntry :
+ expectedLogEndOffset.entrySet()) {
+ assertReplicaStatus(
+ expectedLogEndOffsetEntry.getKey(),
expectedLogEndOffsetEntry.getValue());
}
}
@@ -423,20 +410,28 @@ public abstract class FlinkPaimonTieringTestBase {
protected void waitUntilBucketSynced(
TablePath tablePath, long tableId, int bucketCount, boolean
isPartition) {
+ Set<TableBucket> tableBuckets = new HashSet<>();
if (isPartition) {
Map<Long, String> partitionById = waitUntilPartitions(tablePath);
for (Long partitionId : partitionById.keySet()) {
for (int i = 0; i < bucketCount; i++) {
TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
- waitUntilBucketSynced(tableBucket);
+ tableBuckets.add(tableBucket);
}
}
} else {
for (int i = 0; i < bucketCount; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);
- waitUntilBucketSynced(tableBucket);
+ tableBuckets.add(tableBucket);
}
}
+ waitUntilBucketsSynced(tableBuckets);
+ }
+
+ protected void waitUntilBucketsSynced(Set<TableBucket> tableBuckets) {
+ for (TableBucket tableBucket : tableBuckets) {
+ waitUntilBucketSynced(tableBucket);
+ }
}
protected void waitUntilBucketSynced(TableBucket tb) {