This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 12179b049e [core] Fix streaming read bug in postpone bucket tables
when changelog producer is none (#5783)
12179b049e is described below
commit 12179b049e452e46515c8dae5d21d0d67850ad3a
Author: tsreaper <[email protected]>
AuthorDate: Fri Jun 20 16:40:25 2025 +0800
[core] Fix streaming read bug in postpone bucket tables when changelog
producer is none (#5783)
---
.../java/org/apache/paimon/KeyValueFileStore.java | 33 ++++++-------
.../paimon/table/source/DataTableBatchScan.java | 6 +++
.../paimon/table/source/DataTableStreamScan.java | 7 +++
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +++
.../apache/paimon/table/system/AuditLogTable.java | 6 +++
.../apache/paimon/flink/action/CompactAction.java | 1 +
.../apache/paimon/flink/action/RescaleAction.java | 1 +
.../paimon/flink/lookup/LookupDataTableScan.java | 6 +++
.../postpone/PostponeBucketCompactSplitSource.java | 4 --
.../paimon/flink/PostponeBucketTableITCase.java | 54 ++++++++++++++++++++++
11 files changed, 102 insertions(+), 24 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index be4416af9b..c988ba0a73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -236,26 +236,19 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
return Optional.empty();
};
- KeyValueFileStoreScan scan =
- new KeyValueFileStoreScan(
- newManifestsReader(scanType == ScanType.FOR_WRITE),
- bucketSelectConverter,
- snapshotManager(),
- schemaManager,
- schema,
- keyValueFieldsExtractor,
- manifestFileFactory(scanType == ScanType.FOR_WRITE),
- options.scanManifestParallelism(),
- options.deletionVectorsEnabled(),
- options.mergeEngine(),
- options.changelogProducer(),
- options.fileIndexReadEnabled() &&
options.deletionVectorsEnabled());
-
- if (options.bucket() == BucketMode.POSTPONE_BUCKET && scanType ==
ScanType.FOR_READ) {
- scan.onlyReadRealBuckets();
- }
-
- return scan;
+ return new KeyValueFileStoreScan(
+ newManifestsReader(scanType == ScanType.FOR_WRITE),
+ bucketSelectConverter,
+ snapshotManager(),
+ schemaManager,
+ schema,
+ keyValueFieldsExtractor,
+ manifestFileFactory(scanType == ScanType.FOR_WRITE),
+ options.scanManifestParallelism(),
+ options.deletionVectorsEnabled(),
+ options.mergeEngine(),
+ options.changelogProducer(),
+ options.fileIndexReadEnabled() &&
options.deletionVectorsEnabled());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 1f6766f1b1..a1b94f59a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -46,11 +47,16 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
SnapshotReader snapshotReader,
TableQueryAuth queryAuth) {
super(schema, options, snapshotReader, queryAuth);
+
this.hasNext = true;
this.defaultValueAssigner = DefaultValueAssigner.create(schema);
+
if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
snapshotReader.withLevelFilter(level -> level >
0).enableValueFilter();
}
+ if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+ snapshotReader.onlyReadRealBuckets();
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index acf5f1e388..2f42674296 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.ChangelogFollowUpScanner;
@@ -86,6 +87,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
TableQueryAuth queryAuth,
boolean hasPk) {
super(schema, options, snapshotReader, queryAuth);
+
this.options = options;
this.scanMode =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
this.snapshotManager = snapshotManager;
@@ -95,6 +97,11 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
new NextSnapshotFetcher(
snapshotManager, changelogManager,
options.changelogLifecycleDecoupled());
this.hasPk = hasPk;
+
+ if (options.bucket() == BucketMode.POSTPONE_BUCKET
+ && options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE) {
+ snapshotReader.onlyReadRealBuckets();
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index a8504f3db3..12600b7ddc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -90,6 +90,8 @@ public interface SnapshotReader {
SnapshotReader withBucket(int bucket);
+ SnapshotReader onlyReadRealBuckets();
+
SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 93002027e3..d6658be267 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -275,6 +275,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader onlyReadRealBuckets() {
+ scan.onlyReadRealBuckets();
+ return this;
+ }
+
@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
scan.withBucketFilter(bucketFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 5df38af3f2..3325c7ce11 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -369,6 +369,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader onlyReadRealBuckets() {
+ wrapped.onlyReadRealBuckets();
+ return this;
+ }
+
@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
wrapped.withBucketFilter(bucketFilter);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index f55244fd9c..93cfc4b0d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -270,6 +270,7 @@ public class CompactAction extends TableActionBase {
Iterator<ManifestEntry> it =
table.newSnapshotReader()
.withPartitionFilter(Collections.singletonList(partition))
+ .onlyReadRealBuckets()
.readFileIterator();
if (it.hasNext()) {
bucketNum = it.next().totalBuckets();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 292b68bdbc..7e83abc3df 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -147,6 +147,7 @@ public class RescaleAction extends TableActionBase {
.newSnapshotReader()
.withSnapshot(snapshot)
.withPartitionFilter(partition)
+ .onlyReadRealBuckets()
.readFileIterator();
Preconditions.checkArgument(
it.hasNext(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index dbe92f116a..c1fd45f757 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
@@ -70,9 +71,14 @@ public class LookupDataTableScan extends DataTableStreamScan
{
supportStreamingReadOverwrite,
queryAuth,
hasPk);
+
this.startupMode = options.startupMode();
this.lookupScanMode = lookupScanMode;
dropStats();
+
+ if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+ snapshotReader.onlyReadRealBuckets();
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 833eff8120..ab53f8f824 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -47,8 +47,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -67,8 +65,6 @@ import java.util.regex.Pattern;
public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSource<Split> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
- LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
private final FileStoreTable table;
private final Map<String, String> partitionSpec;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index f66c2f0998..d2572fb737 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -672,6 +672,60 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
.containsExactlyInAnyOrder("+I[1,
2025-06-11T16:35:45.123456789]", "+I[2, null]");
}
+ @Timeout(TIMEOUT)
+ @Test
+ public void testNoneChangelogProducer() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+ .checkpointIntervalMs(500)
+ .build();
+ String createCatalog =
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")";
+ sEnv.executeSql(createCatalog);
+ sEnv.executeSql("USE CATALOG mycat");
+ sEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2',\n"
+ + " 'changelog-producer' = 'none',\n"
+ + " 'scan.remove-normalize' = 'true',\n"
+ + " 'continuous.discovery-interval' = '1ms'\n"
+ + ")");
+
+ TableEnvironment bEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .parallelism(1)
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+ bEnv.executeSql(createCatalog);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (1,
100)").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder("+I[1, 101]", "+I[2, 20]", "+I[3,
31]");
+ TableResult streamingSelect =
+ sEnv.executeSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id' = '1') */");
+ JobClient client = streamingSelect.getJobClient().get();
+ CloseableIterator<Row> it = streamingSelect.collect();
+ assertThat(collect(client, it, 5))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]",
"+I[3, 31]");
+ }
+
private List<String> collect(TableResult result) throws Exception {
List<String> ret = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {