This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c1ae4cf684 [core] Add merge-on-read option for DV table batch L0
visibility (#7948)
c1ae4cf684 is described below
commit c1ae4cf684caca2da26b1ec646335c479a8efe9b
Author: Junrui Lee <[email protected]>
AuthorDate: Mon May 25 19:01:30 2026 +0800
[core] Add merge-on-read option for DV table batch L0 visibility (#7948)
In some production shared-dataset deployments, compaction is separated
from writers and runs as dedicated jobs. For DV primary-key tables,
batch scans skip level-0 files by default, so newly written uncompacted
data is not visible until compaction finishes. This means readers depend
on compaction progress to see the latest data.
`visibility-callback.enabled` does not fit this requirement because it
also relies on compaction: commits are returned only after compaction
makes the data visible through the optimized read path. In this
scenario, users want to read the latest committed data without waiting
for compaction.
---
docs/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 20 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 4 +
.../apache/paimon/schema/SchemaValidationTest.java | 45 +++++++
.../table/DeletionVectorsMergeOnReadTest.java | 145 +++++++++++++++++++++
.../apache/paimon/flink/DeletionVectorITCase.java | 30 +++++
.../paimon/spark/sql/VisibilityCallbackTest.scala | 4 +-
7 files changed, 252 insertions(+), 2 deletions(-)
diff --git a/docs/generated/core_configuration.html
b/docs/generated/core_configuration.html
index 5d608dba9e..4813692455 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -512,6 +512,12 @@
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
</tr>
+ <tr>
+ <td><h5>deletion-vectors.merge-on-read</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>When deletion vectors are enabled, uncompacted files are not
visible by default. Set this to true to enable merge-on-read, which makes
uncompacted data visible at the cost of read performance. This option only
affects batch scan visibility of DV level-0 files, it does not change streaming
scan or changelog behavior.</td>
+ </tr>
<tr>
<td><h5>deletion-vectors.modifiable</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 9de20d19cd..8ef13bef20 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1879,6 +1879,17 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(2))
.withDescription("The target size of deletion vector index
file.");
+ public static final ConfigOption<Boolean> DELETION_VECTORS_MERGE_ON_READ =
+ key("deletion-vectors.merge-on-read")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When deletion vectors are enabled, uncompacted
files are not visible by default. "
+ + "Set this to true to enable
merge-on-read, which makes uncompacted data "
+ + "visible at the cost of read
performance. "
+ + "This option only affects batch scan
visibility of DV level-0 files, "
+ + "it does not change streaming scan or
changelog behavior.");
+
public static final ConfigOption<Boolean> DELETION_VECTOR_BITMAP64 =
key("deletion-vectors.bitmap64")
.booleanType()
@@ -3648,8 +3659,15 @@ public class CoreOptions implements Serializable {
return options.get(FORCE_LOOKUP);
}
+ public boolean deletionVectorsMergeOnRead() {
+ return options.get(DELETION_VECTORS_MERGE_ON_READ);
+ }
+
public boolean batchScanSkipLevel0() {
- return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW;
+ if (deletionVectorsEnabled()) {
+ return !deletionVectorsMergeOnRead();
+ }
+ return mergeEngine() == FIRST_ROW;
}
public MemorySize dvIndexFileTargetSize() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b703caaf9b..4d84e21fc1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -281,6 +281,10 @@ public class SchemaValidation {
if (options.deletionVectorsEnabled()) {
validateForDeletionVectors(options);
+ } else {
+ checkArgument(
+ !options.deletionVectorsMergeOnRead(),
+ "deletion-vectors.merge-on-read requires
deletion-vectors.enabled to be true.");
}
// vector field names must point to vector type
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 89796948f2..c3a79d91fd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -508,4 +508,49 @@ class SchemaValidationTest {
validateTableSchema(
new TableSchema(1, fields, 10, emptyList(),
singletonList("k"), options, ""));
}
+
+ @Test
+ public void testMergeOnReadCoexistsWithVisibilityCallback() {
+ Map<String, String> options = new HashMap<>();
+ options.put("deletion-vectors.enabled", "true");
+ options.put("deletion-vectors.merge-on-read", "true");
+ options.put("visibility-callback.enabled", "true");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void
testMergeOnReadCoexistsWithVisibilityCallbackAndPostponeBucket() {
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.INT()),
+ new DataField(3, "f3", DataTypes.STRING()));
+ Map<String, String> options = new HashMap<>();
+ options.put("deletion-vectors.enabled", "true");
+ options.put("deletion-vectors.merge-on-read", "true");
+ options.put("visibility-callback.enabled", "true");
+ options.put(BUCKET.key(), String.valueOf(-2));
+ assertThatCode(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f0"),
+ singletonList("f1"),
+ options,
+ "")))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testMergeOnReadRequiresDvEnabled() {
+ Map<String, String> options = new HashMap<>();
+ options.put("deletion-vectors.merge-on-read", "true");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining(
+ "deletion-vectors.merge-on-read requires
deletion-vectors.enabled to be true");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
new file mode 100644
index 0000000000..e3b99523a2
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#DELETION_VECTORS_MERGE_ON_READ}. */
+public class DeletionVectorsMergeOnReadTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Catalog catalog;
+ private final Identifier identifier = Identifier.create("my_db", "T");
+
+ @BeforeEach
+ public void before() throws Exception {
+ Options options = new Options();
+ options.set("warehouse", new Path(tempDir.toString() +
"/warehouse").toUri().toString());
+ catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
+ catalog.createDatabase("my_db", true);
+ }
+
+ private FileStoreTable createTable(boolean mergeOnRead) throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("k", DataTypes.INT());
+ schemaBuilder.column("v", DataTypes.INT());
+ schemaBuilder.primaryKey("k");
+ schemaBuilder.option("bucket", "1");
+ schemaBuilder.option("deletion-vectors.enabled", "true");
+ schemaBuilder.option("write-only", "true");
+ if (mergeOnRead) {
+ schemaBuilder.option("deletion-vectors.merge-on-read", "true");
+ }
+ catalog.createTable(identifier, schemaBuilder.build(), true);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+
+ private void writeCommit(FileStoreTable table, GenericRow... rows) throws
Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ for (GenericRow row : rows) {
+ write.write(row);
+ }
+ writeBuilder.newCommit().commit(write.prepareCommit());
+ write.close();
+ }
+
+ private List<GenericRow> query(FileStoreTable table) throws Exception {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ List<GenericRow> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(plan)
+ .forEachRemaining(row ->
result.add(GenericRow.of(row.getInt(0), row.getInt(1))));
+ return result;
+ }
+
+ @Test
+ public void testDefaultSkipsLevel0() throws Exception {
+ FileStoreTable table = createTable(false);
+
+ writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+ writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+ // write-only mode, no compaction, all files at level 0
+ // default DV mode skips level 0 — only the first commit's compacted
data is visible
+ // since no compaction has run, no data should be visible from level > 0
+ List<GenericRow> result = query(table);
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testMergeOnReadReadsLevel0() throws Exception {
+ FileStoreTable table = createTable(true);
+
+ writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+ writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+ // merge-on-read enabled, level 0 data is visible via MOR
+ List<GenericRow> result = query(table);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 11), GenericRow.of(2, 20),
GenericRow.of(3, 30));
+ }
+
+ @Test
+ public void testMergeOnReadWithQueryHint() throws Exception {
+ FileStoreTable table = createTable(false);
+
+ writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
+ writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));
+
+ // default: no data visible (L0 skipped)
+ assertThat(query(table)).isEmpty();
+
+ // override with dynamic option to enable merge-on-read
+ table =
+ table.copy(
+ java.util.Collections.singletonMap(
+ "deletion-vectors.merge-on-read", "true"));
+ List<GenericRow> result = query(table);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 11), GenericRow.of(2, 20),
GenericRow.of(3, 30));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 9f24ee548d..f42f83a28e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -206,6 +206,36 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3,
"3_1"), Row.of(4, "4"));
}
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ public void testBatchReadDVTableWithMergeOnRead(String changelogProducer,
boolean dvBitmap64) {
+ sql(
+ String.format(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "'deletion-vectors.bitmap64' = '%s',
'write-only' = 'true', 'bucket' = '1')",
+ changelogProducer, dvBitmap64));
+
+ sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
+
+ sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");
+
+ sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");
+
+ // write-only with fixed bucket, all files at level 0, not visible
without merge-on-read
+ assertThat(batchSql("SELECT * FROM T")).isEmpty();
+
+ // with merge-on-read enabled, level 0 data becomes visible via MOR
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('deletion-vectors.merge-on-read'='true') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "111111111"),
+ Row.of(2, "2_2"),
+ Row.of(3, "3_1"),
+ Row.of(4, "4_1"));
+ }
+
@ParameterizedTest
@MethodSource("parameters1")
public void testDVTableWithAggregationMergeEngine(String
changelogProducer, boolean dvBitmap64)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
index dc636e3c99..7c49262a03 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala
@@ -29,17 +29,19 @@ import scala.concurrent.duration.DurationInt
class VisibilityCallbackTest extends PaimonSparkTestBase {
- Seq((true, false), (false, true)).foreach {
+ Seq((true, false), (false, true), (true, true)).foreach {
case (dv, postpone) =>
test(s"Visibility callback with deletion-vectors $dv and postpone-bucket
$postpone") {
withTable("T") {
val bucket = if (postpone) -2 else 1
+ val mergeOnRead = if (dv) "true" else "false"
sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'bucket' = '$bucket',
| 'primary-key' = 'id',
| 'deletion-vectors.enabled' = '$dv',
+ | 'deletion-vectors.merge-on-read' = '$mergeOnRead',
| 'write-only' = 'true'
|)
|""".stripMargin)