This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b352de5b20 [spark] Introduce plan-auto-tag-for-read for Spark read
(#7897)
b352de5b20 is described below
commit b352de5b20d46e35e35b1f85c7409ea0481a4efc
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 08:37:04 2026 +0800
[spark] Introduce plan-auto-tag-for-read for Spark read (#7897)
Introduce `'scan.plan-auto-tag-for-read.time-retained'` option:
When set, a temporary tag will be auto-created during batch scan
planning to protect the read snapshot from expiration. The value
specifies the tag's TTL. Should be longer than the longest expected
batch read duration.
---
.../generated/catalog_configuration.html | 60 ++++----
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 14 ++
.../paimon/table/source/DataTableBatchScan.java | 39 ++++-
.../apache/paimon/table/source/InnerTableScan.java | 5 +
.../org/apache/paimon/tag/BatchReadTagCreator.java | 105 +++++++++++++
.../apache/paimon/tag/BatchReadTagCreatorTest.java | 139 ++++++++++++++++++
.../org/apache/paimon/spark/PaimonBaseScan.scala | 24 ++-
.../spark/read/BatchReadTagCleanupListener.scala | 93 ++++++++++++
.../read/BatchReadTagConcurrentExpireTest.scala | 163 +++++++++++++++++++++
.../spark/read/BatchReadTagProtectionTest.scala | 118 +++++++++++++++
11 files changed, 723 insertions(+), 43 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 6cafa6ccdf..d885fee705 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -104,6 +104,36 @@ under the License.
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to
a regular csv, parquet or orc table, allowing read and write operations.
However, during these processes, it does not connect to the metastore; hence,
newly added partitions will not be reflected in the metastore and need to be
manually added as separate partition operations.</td>
</tr>
+ <tr>
+ <td><h5>local-cache.block-size</h5></td>
+ <td style="word-wrap: break-word;">1 mb</td>
+ <td>MemorySize</td>
+ <td>Block size for local cache.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.dir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Directory for local block cache on disk. If not configured,
memory cache is used instead.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable local block cache for file reads. If
local-cache.dir is configured, disk cache is used; otherwise memory cache is
used.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.max-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Maximum total size of the local block cache. Unlimited by
default.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.whitelist</h5></td>
+ <td style="word-wrap: break-word;">"meta,global-index"</td>
+ <td>String</td>
+ <td>Comma-separated list of file types to cache. Supported values:
meta, global-index, bucket-index, data, file-index.</td>
+ </tr>
<tr>
<td><h5>lock-acquire-timeout</h5></td>
<td style="word-wrap: break-word;">8 min</td>
@@ -146,36 +176,6 @@ under the License.
<td>Boolean</td>
<td>Sync all table properties to the catalog metastore (e.g. Hive
metastore, JDBC catalog store)</td>
</tr>
- <tr>
- <td><h5>local-cache.block-size</h5></td>
- <td style="word-wrap: break-word;">1 mb</td>
- <td>MemorySize</td>
- <td>Block size for local cache.</td>
- </tr>
- <tr>
- <td><h5>local-cache.dir</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Directory for local block cache on disk. If not configured,
memory cache is used instead.</td>
- </tr>
- <tr>
- <td><h5>local-cache.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to enable local block cache for file reads. If
local-cache.dir is configured, disk cache is used; otherwise memory cache is
used.</td>
- </tr>
- <tr>
- <td><h5>local-cache.max-size</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>MemorySize</td>
- <td>Maximum total size of the local block cache. Unlimited by
default.</td>
- </tr>
- <tr>
- <td><h5>local-cache.whitelist</h5></td>
- <td style="word-wrap: break-word;">"meta,global-index"</td>
- <td>String</td>
- <td>Comma-separated list of file types to cache. Supported values:
meta, global-index, bucket-index, data, file-index.</td>
- </tr>
<tr>
<td><h5>table.type</h5></td>
<td style="word-wrap: break-word;">managed</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 7496cbfc15..0b82912983 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1259,6 +1259,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td><p>Enum</p></td>
<td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
</tr>
+ <tr>
+ <td><h5>scan.plan-auto-tag-for-read.time-retained</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>When set, a temporary tag will be auto-created during batch
scan planning to protect the read snapshot from expiration. The value specifies
the tag's TTL. Should be longer than the longest expected batch read
duration.</td>
+ </tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 176d1e9d4d..d5a1bb7fb0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1767,6 +1767,15 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Use customized name when creating tags
in Batch mode.");
+ public static final ConfigOption<Duration>
SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED =
+ key("scan.plan-auto-tag-for-read.time-retained")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "When set, a temporary tag will be auto-created
during batch scan planning "
+ + "to protect the read snapshot from
expiration. The value specifies the tag's TTL. "
+ + "Should be longer than the longest
expected batch read duration.");
+
public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT
=
key("snapshot.watermark-idle-timeout")
.durationType()
@@ -3550,6 +3559,11 @@ public class CoreOptions implements Serializable {
return options.get(TAG_BATCH_CUSTOMIZED_NAME);
}
+ @Nullable
+ public Duration scanPlanAutoTagTimeRetained() {
+ return options.get(SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED);
+ }
+
public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 255ff5f672..710f736985 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -29,11 +29,17 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
+import org.apache.paimon.tag.BatchReadTagCreator;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -53,6 +59,7 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
private TopN topN;
private final SchemaManager schemaManager;
+ @Nullable private String readProtectionTagName;
public DataTableBatchScan(
TableSchema schema,
@@ -103,15 +110,20 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
if (hasNext) {
hasNext = false;
+ StartingScanner.Result result;
Optional<StartingScanner.Result> pushed = applyPushDownLimit();
if (pushed.isPresent()) {
- return DataFilePlan.fromResult(pushed.get());
+ result = pushed.get();
+ } else {
+ pushed = applyPushDownTopN();
+ result = pushed.orElseGet(() ->
startingScanner.scan(snapshotReader));
}
- pushed = applyPushDownTopN();
- if (pushed.isPresent()) {
- return DataFilePlan.fromResult(pushed.get());
+
+ if (result instanceof ScannedResult) {
+ maybeCreateReadProtectionTag(((ScannedResult)
result).currentSnapshotId());
}
- return
DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
+
+ return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
}
@@ -209,4 +221,21 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}
+
+ @Override
+ @Nullable
+ public String readProtectionTagName() {
+ return readProtectionTagName;
+ }
+
+ private void maybeCreateReadProtectionTag(long snapshotId) {
+ Duration timeRetained = options().scanPlanAutoTagTimeRetained();
+ if (timeRetained == null) {
+ return;
+ }
+ SnapshotManager sm = snapshotReader.snapshotManager();
+ TagManager tagMgr = new TagManager(sm.fileIO(), sm.tablePath(),
sm.branch());
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagMgr, sm,
timeRetained);
+ this.readProtectionTagName = creator.createReadTag(snapshotId);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index c09809feb9..38278d4620 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -106,4 +106,9 @@ public interface InnerTableScan extends TableScan {
// do nothing, should implement this if need
return this;
}
+
+ @Nullable
+ default String readProtectionTagName() {
+ return null;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
new file mode 100644
index 0000000000..8637709b31
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+
+/** Creates temporary tags to protect snapshots from expiration during batch
reads. */
+public class BatchReadTagCreator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchReadTagCreator.class);
+
+ public static final String BATCH_READ_TAG_PREFIX = "batch-read-";
+
+ private final TagManager tagManager;
+ private final SnapshotManager snapshotManager;
+ private final Duration timeRetained;
+
+ public BatchReadTagCreator(
+ TagManager tagManager, SnapshotManager snapshotManager, Duration
timeRetained) {
+ this.tagManager = tagManager;
+ this.snapshotManager = snapshotManager;
+ this.timeRetained = timeRetained;
+ }
+
+ @Nullable
+ public String createReadTag(long snapshotId) {
+ Snapshot snapshot;
+ try {
+ snapshot = snapshotManager.snapshot(snapshotId);
+ } catch (Exception e) {
+ LOG.warn("Failed to get snapshot {} for read protection tag.",
snapshotId, e);
+ return null;
+ }
+
+ String tagName = generateTagName(snapshotId);
+ try {
+ tagManager.createTag(snapshot, tagName, timeRetained,
Collections.emptyList(), true);
+ LOG.info(
+ "Created batch read protection tag '{}' for snapshot {}.",
tagName, snapshotId);
+ return tagName;
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to create batch read protection tag for snapshot
{}. "
+ + "Read will proceed without protection.",
+ snapshotId,
+ e);
+ return null;
+ }
+ }
+
+ public void deleteReadTag(String tagName) {
+ try {
+ if (tagManager.tagExists(tagName)) {
+ // Directly delete the tag metadata file instead of using
TagManager.deleteTag(),
+ // which would also scan and delete unreferenced data files —
too heavyweight for a
+ // read-path cleanup. Any orphan data files left behind will
be reclaimed by
+ // OrphanFilesClean.
+
snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName));
+ LOG.info("Deleted batch read protection tag '{}'.", tagName);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to delete batch read protection tag '{}'. "
+ + "It will be cleaned up by TTL expiration.",
+ tagName,
+ e);
+ }
+ }
+
+ public static boolean isBatchReadTag(String tagName) {
+ return tagName.startsWith(BATCH_READ_TAG_PREFIX);
+ }
+
+ private String generateTagName(long snapshotId) {
+ String uuid = UUID.randomUUID().toString().substring(0, 8);
+ return BATCH_READ_TAG_PREFIX + snapshotId + "-" + uuid;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
new file mode 100644
index 0000000000..4f225e2f07
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchReadTagCreator}. */
+public class BatchReadTagCreatorTest extends PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ return options;
+ }
+
+ @Test
+ public void testCreateAndDeleteReadTag() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+ long snapshotId = sm.latestSnapshotId();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm,
Duration.ofHours(1));
+
+ String tagName = creator.createReadTag(snapshotId);
+ assertThat(tagName).isNotNull();
+
assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+ assertThat(tagManager.tagExists(tagName)).isTrue();
+
+ creator.deleteReadTag(tagName);
+ assertThat(tagManager.tagExists(tagName)).isFalse();
+ }
+
+ @Test
+ public void testIsBatchReadTag() {
+
assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-1-abc12345")).isTrue();
+
assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-42-xyz")).isTrue();
+ assertThat(BatchReadTagCreator.isBatchReadTag("my-tag")).isFalse();
+ assertThat(BatchReadTagCreator.isBatchReadTag("2023-07-18
11")).isFalse();
+ }
+
+ @Test
+ public void testCreateTagFailsGracefully() {
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm,
Duration.ofHours(1));
+
+ // snapshot 999 does not exist
+ String tagName = creator.createReadTag(999L);
+ assertThat(tagName).isNull();
+ }
+
+ @Test
+ public void testDeleteNonExistentTagIsNoOp() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm,
Duration.ofHours(1));
+
+ // should not throw
+ creator.deleteReadTag("batch-read-nonexistent-12345678");
+ }
+
+ @Test
+ public void testScanCreatesProtectionTag() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ Options options = new Options();
+ options.set(CoreOptions.SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED,
Duration.ofHours(2));
+ FileStoreTable tableWithOption = table.copy(options.toMap());
+
+ InnerTableScan scan = tableWithOption.newScan();
+ TableScan.Plan plan = scan.plan();
+
+ assertThat(plan.splits()).isNotEmpty();
+ assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+
+ DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+ String tagName = batchScan.readProtectionTagName();
+ assertThat(tagName).isNotNull();
+
assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+
+ TagManager tagManager = table.tagManager();
+ assertThat(tagManager.tagExists(tagName)).isTrue();
+
+ // verify tag has TTL set
+ Tag tag = tagManager.getOrThrow(tagName);
+ assertThat(tag.getTagTimeRetained()).isEqualTo(Duration.ofHours(2));
+ }
+
+ @Test
+ public void testScanDoesNotCreateTagWhenDisabled() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ // default: option not set
+ InnerTableScan scan = table.newScan();
+ scan.plan();
+
+ assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+ DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+ assertThat(batchScan.readProtectionTagName()).isNull();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 30a1621925..ff1e0bde37 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -20,14 +20,15 @@ package org.apache.paimon.spark
import org.apache.paimon.globalindex.GlobalIndexResult
import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.PredicateBuilder
import org.apache.paimon.spark.metric.SparkMetricRegistry
-import org.apache.paimon.spark.read.{BaseScan, PaimonSupportsRuntimeFiltering}
+import org.apache.paimon.spark.read.{BaseScan, BatchReadTagCleanupListener,
PaimonSupportsRuntimeFiltering}
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
-import org.apache.paimon.table.source.{InnerTableScan, Split}
+import org.apache.paimon.table.source.{DataTableBatchScan, InnerTableScan,
Split}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.Batch
@@ -43,15 +44,22 @@ abstract class PaimonBaseScan(table: InnerTable)
private lazy val paimonMetricsRegistry: SparkMetricRegistry =
SparkMetricRegistry()
protected def getInputSplits: Array[Split] = {
- readBuilder
+ val scan = readBuilder
.newScan()
.withGlobalIndexResult(evalGlobalIndexSearch())
.asInstanceOf[InnerTableScan]
.withMetricRegistry(paimonMetricsRegistry)
- .plan()
- .splits()
- .asScala
- .toArray
+
+ val plan = scan.plan()
+
+ Option(scan.readProtectionTagName).foreach {
+ name =>
+ BatchReadTagCleanupListener
+ .getOrCreate(SparkSession.active)
+ .registerCleanup(name, table)
+ }
+
+ plan.splits().asScala.toArray
}
private def evalGlobalIndexSearch(): GlobalIndexResult = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
new file mode 100644
index 0000000000..988748c109
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.table.{DataTable, Table}
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * A Spark [[QueryExecutionListener]] that cleans up batch-read protection
tags when a query
+ * completes (success or failure). TTL expiration serves as a safety net if
this cleanup fails.
+ */
+class BatchReadTagCleanupListener extends QueryExecutionListener with Logging {
+
+ private val pendingCleanups = new ConcurrentHashMap[String, DataTable]()
+
+ def registerCleanup(tagName: String, table: Table): Unit = {
+ table match {
+ case dt: DataTable => pendingCleanups.put(tagName, dt)
+ case _ =>
+ }
+ }
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ cleanupAll()
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {
+ cleanupAll()
+ }
+
+ private def cleanupAll(): Unit = {
+ val iter = pendingCleanups.entrySet().iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val tagName = entry.getKey
+ val dataTable = entry.getValue
+ iter.remove()
+ try {
+ val creator = new BatchReadTagCreator(
+ dataTable.tagManager(),
+ dataTable.snapshotManager(),
+ dataTable.coreOptions().scanPlanAutoTagTimeRetained())
+ creator.deleteReadTag(tagName)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to delete batch read protection tag '$tagName'. " +
+ "It will be cleaned up by TTL expiration.",
+ e)
+ }
+ }
+ }
+}
+
+object BatchReadTagCleanupListener {
+
+ @volatile private var instance: BatchReadTagCleanupListener = _
+
+ def getOrCreate(spark: SparkSession): BatchReadTagCleanupListener = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ instance = new BatchReadTagCleanupListener()
+ spark.listenerManager.register(instance)
+ }
+ }
+ }
+ instance
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
new file mode 100644
index 0000000000..45d1118d85
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.options.ExpireConfig
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataTableBatchScan
+
+import java.time.Duration
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that simulate concurrent snapshot expiration during a batch read.
+ *
+ * Uses an append-only table with INSERT OVERWRITE to ensure old data files
become unreferenced by
+ * later snapshots. Without protection: data files are deleted during
expiration, read fails. With
+ * protection (auto-tag): data files are preserved, read succeeds.
+ */
+class BatchReadTagConcurrentExpireTest extends PaimonSparkTestBase {
+
+ test("Paimon: without protection, read fails after concurrent expiration") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |""".stripMargin)
+
+ // write snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+
+ // Step 1: plan the scan (get splits referencing snapshot 1 data files)
+ val scan = table.newScan()
+ val plan = scan.plan()
+ val splits = plan.splits()
+ assert(!splits.isEmpty)
+
+ // Step 2: OVERWRITE makes snapshot 1's files unreferenced by the new
snapshot
+ spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+ spark.sql("INSERT INTO T VALUES (5, 'v5')")
+ spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+ // Step 3: aggressively expire snapshots (keep only 1)
+ val reloadedTable = loadTable("T")
+ reloadedTable
+ .newExpireSnapshots()
+ .config(
+ ExpireConfig
+ .builder()
+ .snapshotMaxDeletes(Integer.MAX_VALUE)
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .snapshotTimeRetain(Duration.ZERO)
+ .build())
+ .expire()
+
+ // Step 4: try to read from the old splits - data files have been deleted
+ val read = table.newReadBuilder().newRead()
+ var readFailed = false
+ try {
+ val readers = splits.asScala.map(split => read.createReader(split))
+ readers.foreach {
+ reader =>
+ val iter = reader.toCloseableIterator
+ while (iter.hasNext) iter.next()
+ iter.close()
+ }
+ } catch {
+ case _: Exception =>
+ readFailed = true
+ }
+
+ assert(readFailed, "Read should fail because data files were deleted by
expiration")
+ }
+
+ test("Paimon: with protection, read succeeds after concurrent expiration") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+ |)
+ |""".stripMargin)
+
+ // write snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+
+ // Step 1: plan the scan - this creates a protection tag automatically
+ val scan = table.newScan()
+ val plan = scan.plan()
+ val splits = plan.splits()
+ assert(!splits.isEmpty)
+
+ // Verify protection tag was created
+ val batchScan = scan.asInstanceOf[DataTableBatchScan]
+ val tagName = batchScan.readProtectionTagName
+ assert(tagName != null, "Protection tag should be created during scan
planning")
+ assert(table.tagManager().tagExists(tagName))
+
+ // Step 2: OVERWRITE makes snapshot 1's files unreferenced by new snapshot
+ spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+ spark.sql("INSERT INTO T VALUES (5, 'v5')")
+ spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+ // Step 3: aggressively expire snapshots (keep only 1)
+ val reloadedTable = loadTable("T")
+ reloadedTable
+ .newExpireSnapshots()
+ .config(
+ ExpireConfig
+ .builder()
+ .snapshotMaxDeletes(Integer.MAX_VALUE)
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .snapshotTimeRetain(Duration.ZERO)
+ .build())
+ .expire()
+
+ // Step 4: read from old splits - should succeed because tag protects data
files
+ val read = table.newReadBuilder().newRead()
+ val results = new util.ArrayList[String]()
+ splits.asScala.foreach {
+ split =>
+ val reader = read.createReader(split)
+ val iter = reader.toCloseableIterator
+ while (iter.hasNext) {
+ val row = iter.next()
+ results.add(s"${row.getInt(0)},${row.getString(1).toString}")
+ }
+ iter.close()
+ }
+
+ assert(results.size() == 2, s"Should read 2 rows but got
${results.size()}")
+ assert(results.contains("1,v1"))
+ assert(results.contains("2,v2"))
+
+ // Cleanup: delete the protection tag
+ val snapshotManager = table.snapshotManager()
+ val creator = new org.apache.paimon.tag.BatchReadTagCreator(
+ table.tagManager(),
+ snapshotManager,
+ Duration.ofHours(1))
+ creator.deleteReadTag(tagName)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
new file mode 100644
index 0000000000..afced7afed
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.sql.Row
+
+class BatchReadTagProtectionTest extends PaimonSparkTestBase {
+
+ test("Paimon: batch read creates protection tag when enabled") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'a',
+ | 'bucket' = '1',
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+ |)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+
+ // query triggers scan which creates a protection tag
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") ::
Row(2, "v2") :: Nil)
+
+ // after query completes, listener should have cleaned up the tag
+ // give it a moment since listener fires asynchronously
+ Thread.sleep(500)
+
+ val remainingTags = tagManager.allTagNames()
+ val batchReadTags = remainingTags.toArray
+ .map(_.toString)
+ .filter(BatchReadTagCreator.isBatchReadTag)
+ assert(
+ batchReadTags.isEmpty,
+ s"Protection tags should be cleaned up, but found:
${batchReadTags.mkString(", ")}")
+ }
+
+ test("Paimon: batch read does NOT create tag when disabled") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") ::
Row(2, "v2") :: Nil)
+
+ val remainingTags = tagManager.allTagNames()
+ val batchReadTags = remainingTags.toArray
+ .map(_.toString)
+ .filter(BatchReadTagCreator.isBatchReadTag)
+ assert(batchReadTags.isEmpty, "No protection tags should be created when
feature is disabled")
+ }
+
+ test("Paimon: protection tag prevents data file deletion during expiration")
{
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'a',
+ | 'bucket' = '1',
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h',
+ | 'snapshot.num-retained.min' = '1',
+ | 'snapshot.num-retained.max' = '1'
+ |)
+ |""".stripMargin)
+
+ // create snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+ val snapshotManager = table.snapshotManager()
+
+ // manually create a protection tag on snapshot 1 (simulating what scan
does)
+ val creator =
+ new BatchReadTagCreator(tagManager, snapshotManager,
java.time.Duration.ofHours(1))
+ val tagName = creator.createReadTag(1L)
+ assert(tagName != null)
+ assert(tagManager.tagExists(tagName))
+
+ // create more snapshots to trigger expiration of snapshot 1
+ spark.sql("INSERT INTO T VALUES (3, 'v3')")
+ spark.sql("INSERT INTO T VALUES (4, 'v4')")
+
+ // snapshot 1 may be expired now, but data files should be protected by
the tag
+ // read from the tag should still work
+ checkAnswer(
+ spark.sql(s"SELECT * FROM T VERSION AS OF '$tagName' ORDER BY a"),
+ Row(1, "v1") :: Row(2, "v2") :: Nil)
+
+ // cleanup
+ creator.deleteReadTag(tagName)
+ }
+}