This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 12179b049e [core] Fix streaming read bug in postpone bucket tables 
when changelog producer is none (#5783)
12179b049e is described below

commit 12179b049e452e46515c8dae5d21d0d67850ad3a
Author: tsreaper <[email protected]>
AuthorDate: Fri Jun 20 16:40:25 2025 +0800

    [core] Fix streaming read bug in postpone bucket tables when changelog 
producer is none (#5783)
---
 .../java/org/apache/paimon/KeyValueFileStore.java  | 33 ++++++-------
 .../paimon/table/source/DataTableBatchScan.java    |  6 +++
 .../paimon/table/source/DataTableStreamScan.java   |  7 +++
 .../table/source/snapshot/SnapshotReader.java      |  2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  6 +++
 .../apache/paimon/table/system/AuditLogTable.java  |  6 +++
 .../apache/paimon/flink/action/CompactAction.java  |  1 +
 .../apache/paimon/flink/action/RescaleAction.java  |  1 +
 .../paimon/flink/lookup/LookupDataTableScan.java   |  6 +++
 .../postpone/PostponeBucketCompactSplitSource.java |  4 --
 .../paimon/flink/PostponeBucketTableITCase.java    | 54 ++++++++++++++++++++++
 11 files changed, 102 insertions(+), 24 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index be4416af9b..c988ba0a73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -236,26 +236,19 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                     return Optional.empty();
                 };
 
-        KeyValueFileStoreScan scan =
-                new KeyValueFileStoreScan(
-                        newManifestsReader(scanType == ScanType.FOR_WRITE),
-                        bucketSelectConverter,
-                        snapshotManager(),
-                        schemaManager,
-                        schema,
-                        keyValueFieldsExtractor,
-                        manifestFileFactory(scanType == ScanType.FOR_WRITE),
-                        options.scanManifestParallelism(),
-                        options.deletionVectorsEnabled(),
-                        options.mergeEngine(),
-                        options.changelogProducer(),
-                        options.fileIndexReadEnabled() && 
options.deletionVectorsEnabled());
-
-        if (options.bucket() == BucketMode.POSTPONE_BUCKET && scanType == 
ScanType.FOR_READ) {
-            scan.onlyReadRealBuckets();
-        }
-
-        return scan;
+        return new KeyValueFileStoreScan(
+                newManifestsReader(scanType == ScanType.FOR_WRITE),
+                bucketSelectConverter,
+                snapshotManager(),
+                schemaManager,
+                schema,
+                keyValueFieldsExtractor,
+                manifestFileFactory(scanType == ScanType.FOR_WRITE),
+                options.scanManifestParallelism(),
+                options.deletionVectorsEnabled(),
+                options.mergeEngine(),
+                options.changelogProducer(),
+                options.fileIndexReadEnabled() && 
options.deletionVectorsEnabled());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 1f6766f1b1..a1b94f59a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
@@ -46,11 +47,16 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
             SnapshotReader snapshotReader,
             TableQueryAuth queryAuth) {
         super(schema, options, snapshotReader, queryAuth);
+
         this.hasNext = true;
         this.defaultValueAssigner = DefaultValueAssigner.create(schema);
+
         if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
             snapshotReader.withLevelFilter(level -> level > 
0).enableValueFilter();
         }
+        if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+            snapshotReader.onlyReadRealBuckets();
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index acf5f1e388..2f42674296 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.ChangelogFollowUpScanner;
@@ -86,6 +87,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
             TableQueryAuth queryAuth,
             boolean hasPk) {
         super(schema, options, snapshotReader, queryAuth);
+
         this.options = options;
         this.scanMode = 
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
         this.snapshotManager = snapshotManager;
@@ -95,6 +97,11 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                 new NextSnapshotFetcher(
                         snapshotManager, changelogManager, 
options.changelogLifecycleDecoupled());
         this.hasPk = hasPk;
+
+        if (options.bucket() == BucketMode.POSTPONE_BUCKET
+                && options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE) {
+            snapshotReader.onlyReadRealBuckets();
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index a8504f3db3..12600b7ddc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -90,6 +90,8 @@ public interface SnapshotReader {
 
     SnapshotReader withBucket(int bucket);
 
+    SnapshotReader onlyReadRealBuckets();
+
     SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
 
     SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 93002027e3..d6658be267 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -275,6 +275,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader onlyReadRealBuckets() {
+        scan.onlyReadRealBuckets();
+        return this;
+    }
+
     @Override
     public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
         scan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 5df38af3f2..3325c7ce11 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -369,6 +369,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader onlyReadRealBuckets() {
+            wrapped.onlyReadRealBuckets();
+            return this;
+        }
+
         @Override
         public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
             wrapped.withBucketFilter(bucketFilter);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index f55244fd9c..93cfc4b0d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -270,6 +270,7 @@ public class CompactAction extends TableActionBase {
             Iterator<ManifestEntry> it =
                     table.newSnapshotReader()
                             
.withPartitionFilter(Collections.singletonList(partition))
+                            .onlyReadRealBuckets()
                             .readFileIterator();
             if (it.hasNext()) {
                 bucketNum = it.next().totalBuckets();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 292b68bdbc..7e83abc3df 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -147,6 +147,7 @@ public class RescaleAction extends TableActionBase {
                         .newSnapshotReader()
                         .withSnapshot(snapshot)
                         .withPartitionFilter(partition)
+                        .onlyReadRealBuckets()
                         .readFileIterator();
         Preconditions.checkArgument(
                 it.hasNext(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index dbe92f116a..c1fd45f757 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.lookup;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataTableStreamScan;
 import org.apache.paimon.table.source.TableQueryAuth;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
@@ -70,9 +71,14 @@ public class LookupDataTableScan extends DataTableStreamScan 
{
                 supportStreamingReadOverwrite,
                 queryAuth,
                 hasPk);
+
         this.startupMode = options.startupMode();
         this.lookupScanMode = lookupScanMode;
         dropStats();
+
+        if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
+            snapshotReader.onlyReadRealBuckets();
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 833eff8120..ab53f8f824 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -47,8 +47,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -67,8 +65,6 @@ import java.util.regex.Pattern;
 public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSource<Split> {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOG =
-            LoggerFactory.getLogger(PostponeBucketCompactSplitSource.class);
 
     private final FileStoreTable table;
     private final Map<String, String> partitionSpec;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index f66c2f0998..d2572fb737 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -672,6 +672,60 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                 .containsExactlyInAnyOrder("+I[1, 
2025-06-11T16:35:45.123456789]", "+I[2, null]");
     }
 
+    @Timeout(TIMEOUT)
+    @Test
+    public void testNoneChangelogProducer() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        .checkpointIntervalMs(500)
+                        .build();
+        String createCatalog =
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")";
+        sEnv.executeSql(createCatalog);
+        sEnv.executeSql("USE CATALOG mycat");
+        sEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2',\n"
+                        + "  'changelog-producer' = 'none',\n"
+                        + "  'scan.remove-normalize' = 'true',\n"
+                        + "  'continuous.discovery-interval' = '1ms'\n"
+                        + ")");
+
+        TableEnvironment bEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .parallelism(1)
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+        bEnv.executeSql(createCatalog);
+        bEnv.executeSql("USE CATALOG mycat");
+        bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (1, 
100)").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
+                .containsExactlyInAnyOrder("+I[1, 101]", "+I[2, 20]", "+I[3, 
31]");
+        TableResult streamingSelect =
+                sEnv.executeSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id' = '1') */");
+        JobClient client = streamingSelect.getJobClient().get();
+        CloseableIterator<Row> it = streamingSelect.collect();
+        assertThat(collect(client, it, 5))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]", 
"+I[3, 31]");
+    }
+
     private List<String> collect(TableResult result) throws Exception {
         List<String> ret = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {

Reply via email to