This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c1ae4cf684 [core] Add merge-on-read option for DV table batch L0 
visibility (#7948)
c1ae4cf684 is described below

commit c1ae4cf684caca2da26b1ec646335c479a8efe9b
Author: Junrui Lee <[email protected]>
AuthorDate: Mon May 25 19:01:30 2026 +0800

    [core] Add merge-on-read option for DV table batch L0 visibility (#7948)
    
    In some production shared-dataset deployments, compaction is separated
    from writers and runs as dedicated jobs. For DV primary-key tables,
    batch scans skip level-0 files by default, so newly written uncompacted
    data is not visible until compaction finishes. This means readers depend
    on compaction progress to see the latest data.
    
    `visibility-callback.enabled` does not fit this requirement because it
    also relies on compaction: commits are returned only after compaction
    makes the data visible through the optimized read path. In this
    scenario, users want to read the latest committed data without waiting
    for compaction.
---
 docs/generated/core_configuration.html             |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  20 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |   4 +
 .../apache/paimon/schema/SchemaValidationTest.java |  45 +++++++
 .../table/DeletionVectorsMergeOnReadTest.java      | 145 +++++++++++++++++++++
 .../apache/paimon/flink/DeletionVectorITCase.java  |  30 +++++
 .../paimon/spark/sql/VisibilityCallbackTest.scala  |   4 +-
 7 files changed, 252 insertions(+), 2 deletions(-)

diff --git a/docs/generated/core_configuration.html 
b/docs/generated/core_configuration.html
index 5d608dba9e..4813692455 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -512,6 +512,12 @@
             <td>Boolean</td>
             <td>Whether to enable deletion vectors mode. In this mode, index 
files containing deletion vectors are generated when data is written, which 
marks the data for deletion. During read operations, by applying these index 
files, merging can be avoided.</td>
         </tr>
+        <tr>
+            <td><h5>deletion-vectors.merge-on-read</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When deletion vectors are enabled, uncompacted files are not 
visible by default. Set this to true to enable merge-on-read, which makes 
uncompacted data visible at the cost of read performance. This option only 
affects batch scan visibility of DV level-0 files, it does not change streaming 
scan or changelog behavior.</td>
+        </tr>
         <tr>
             <td><h5>deletion-vectors.modifiable</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 9de20d19cd..8ef13bef20 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1879,6 +1879,17 @@ public class CoreOptions implements Serializable {
                     .defaultValue(MemorySize.ofMebiBytes(2))
                     .withDescription("The target size of deletion vector index 
file.");
 
+    public static final ConfigOption<Boolean> DELETION_VECTORS_MERGE_ON_READ =
+            key("deletion-vectors.merge-on-read")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When deletion vectors are enabled, uncompacted 
files are not visible by default. "
+                                    + "Set this to true to enable 
merge-on-read, which makes uncompacted data "
+                                    + "visible at the cost of read 
performance. "
+                                    + "This option only affects batch scan 
visibility of DV level-0 files, "
+                                    + "it does not change streaming scan or 
changelog behavior.");
+
     public static final ConfigOption<Boolean> DELETION_VECTOR_BITMAP64 =
             key("deletion-vectors.bitmap64")
                     .booleanType()
@@ -3648,8 +3659,15 @@ public class CoreOptions implements Serializable {
         return options.get(FORCE_LOOKUP);
     }
 
+    public boolean deletionVectorsMergeOnRead() {
+        return options.get(DELETION_VECTORS_MERGE_ON_READ);
+    }
+
     public boolean batchScanSkipLevel0() {
-        return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW;
+        if (deletionVectorsEnabled()) {
+            return !deletionVectorsMergeOnRead();
+        }
+        return mergeEngine() == FIRST_ROW;
     }
 
     public MemorySize dvIndexFileTargetSize() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b703caaf9b..4d84e21fc1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -281,6 +281,10 @@ public class SchemaValidation {
 
         if (options.deletionVectorsEnabled()) {
             validateForDeletionVectors(options);
+        } else {
+            checkArgument(
+                    !options.deletionVectorsMergeOnRead(),
+                    "deletion-vectors.merge-on-read requires 
deletion-vectors.enabled to be true.");
         }
 
         // vector field names must point to vector type
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 89796948f2..c3a79d91fd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -508,4 +508,49 @@ class SchemaValidationTest {
         validateTableSchema(
                 new TableSchema(1, fields, 10, emptyList(), 
singletonList("k"), options, ""));
     }
+
+    @Test
+    public void testMergeOnReadCoexistsWithVisibilityCallback() {
+        Map<String, String> options = new HashMap<>();
+        options.put("deletion-vectors.enabled", "true");
+        options.put("deletion-vectors.merge-on-read", "true");
+        options.put("visibility-callback.enabled", "true");
+        assertThatCode(() -> 
validateTableSchemaExec(options)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void 
testMergeOnReadCoexistsWithVisibilityCallbackAndPostponeBucket() {
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", DataTypes.INT()),
+                        new DataField(1, "f1", DataTypes.INT()),
+                        new DataField(2, "f2", DataTypes.INT()),
+                        new DataField(3, "f3", DataTypes.STRING()));
+        Map<String, String> options = new HashMap<>();
+        options.put("deletion-vectors.enabled", "true");
+        options.put("deletion-vectors.merge-on-read", "true");
+        options.put("visibility-callback.enabled", "true");
+        options.put(BUCKET.key(), String.valueOf(-2));
+        assertThatCode(
+                        () ->
+                                validateTableSchema(
+                                        new TableSchema(
+                                                1,
+                                                fields,
+                                                10,
+                                                singletonList("f0"),
+                                                singletonList("f1"),
+                                                options,
+                                                "")))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testMergeOnReadRequiresDvEnabled() {
+        Map<String, String> options = new HashMap<>();
+        options.put("deletion-vectors.merge-on-read", "true");
+        assertThatThrownBy(() -> validateTableSchemaExec(options))
+                .hasMessageContaining(
+                        "deletion-vectors.merge-on-read requires 
deletion-vectors.enabled to be true");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
new file mode 100644
index 0000000000..e3b99523a2
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#DELETION_VECTORS_MERGE_ON_READ}. */
+public class DeletionVectorsMergeOnReadTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private Catalog catalog;
+    private final Identifier identifier = Identifier.create("my_db", "T");
+
+    @BeforeEach
+    public void before() throws Exception {
+        Options options = new Options();
+        options.set("warehouse", new Path(tempDir.toString() + 
"/warehouse").toUri().toString());
+        catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
+        catalog.createDatabase("my_db", true);
+    }
+
+    private FileStoreTable createTable(boolean mergeOnRead) throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("k", DataTypes.INT());
+        schemaBuilder.column("v", DataTypes.INT());
+        schemaBuilder.primaryKey("k");
+        schemaBuilder.option("bucket", "1");
+        schemaBuilder.option("deletion-vectors.enabled", "true");
+        schemaBuilder.option("write-only", "true");
+        if (mergeOnRead) {
+            schemaBuilder.option("deletion-vectors.merge-on-read", "true");
+        }
+        catalog.createTable(identifier, schemaBuilder.build(), true);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+
+    private void writeCommit(FileStoreTable table, GenericRow... rows) throws 
Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        for (GenericRow row : rows) {
+            write.write(row);
+        }
+        writeBuilder.newCommit().commit(write.prepareCommit());
+        write.close();
+    }
+
+    private List<GenericRow> query(FileStoreTable table) throws Exception {
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        List<GenericRow> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(plan)
+                .forEachRemaining(row -> 
result.add(GenericRow.of(row.getInt(0), row.getInt(1))));
+        return result;
+    }
+
+    @Test
+    public void testDefaultSkipsLevel0() throws Exception {
+        FileStoreTable table = createTable(false);
+
+        writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+        writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+        // write-only mode, no compaction, all files at level 0
+        // default DV mode skips level 0 — only the first commit's compacted 
data is visible
+        // since no compaction has run, no data should be visible from level > 0
+        List<GenericRow> result = query(table);
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    public void testMergeOnReadReadsLevel0() throws Exception {
+        FileStoreTable table = createTable(true);
+
+        writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+        writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+        // merge-on-read enabled, level 0 data is visible via MOR
+        List<GenericRow> result = query(table);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 11), GenericRow.of(2, 20), 
GenericRow.of(3, 30));
+    }
+
+    @Test
+    public void testMergeOnReadWithQueryHint() throws Exception {
+        FileStoreTable table = createTable(false);
+
+        writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+        writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+        // default: no data visible (L0 skipped)
+        assertThat(query(table)).isEmpty();
+
+        // override with dynamic option to enable merge-on-read
+        table =
+                table.copy(
+                        java.util.Collections.singletonMap(
+                                "deletion-vectors.merge-on-read", "true"));
+        List<GenericRow> result = query(table);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 11), GenericRow.of(2, 20), 
GenericRow.of(3, 30));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 9f24ee548d..f42f83a28e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -206,6 +206,36 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
                         Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, 
"3_1"), Row.of(4, "4"));
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters1")
+    public void testBatchReadDVTableWithMergeOnRead(String changelogProducer, 
boolean dvBitmap64) {
+        sql(
+                String.format(
+                        "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name 
STRING) "
+                                + "WITH ('deletion-vectors.enabled' = 'true', 
'changelog-producer' = '%s', "
+                                + "'deletion-vectors.bitmap64' = '%s', 
'write-only' = 'true', 'bucket' = '1')",
+                        changelogProducer, dvBitmap64));
+
+        sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, 
'4')");
+
+        sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");
+
+        sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");
+
+        // write-only with fixed bucket, all files at level 0, not visible 
without merge-on-read
+        assertThat(batchSql("SELECT * FROM T")).isEmpty();
+
+        // with merge-on-read enabled, level 0 data becomes visible via MOR
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('deletion-vectors.merge-on-read'='true') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "111111111"),
+                        Row.of(2, "2_2"),
+                        Row.of(3, "3_1"),
+                        Row.of(4, "4_1"));
+    }
+
     @ParameterizedTest
     @MethodSource("parameters1")
     public void testDVTableWithAggregationMergeEngine(String 
changelogProducer, boolean dvBitmap64)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
index dc636e3c99..7c49262a03 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
@@ -29,17 +29,19 @@ import scala.concurrent.duration.DurationInt
 
 class VisibilityCallbackTest extends PaimonSparkTestBase {
 
-  Seq((true, false), (false, true)).foreach {
+  Seq((true, false), (false, true), (true, true)).foreach {
     case (dv, postpone) =>
       test(s"Visibility callback with deletion-vectors $dv and postpone-bucket 
$postpone") {
         withTable("T") {
           val bucket = if (postpone) -2 else 1
+          val mergeOnRead = if (dv) "true" else "false"
           sql(s"""
                  |CREATE TABLE T (id INT, name STRING)
                  |TBLPROPERTIES (
                  | 'bucket' = '$bucket',
                  | 'primary-key' = 'id',
                  | 'deletion-vectors.enabled' = '$dv',
+                 | 'deletion-vectors.merge-on-read' = '$mergeOnRead',
                  | 'write-only' = 'true'
                  |)
                  |""".stripMargin)

Reply via email to