This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b352de5b20 [spark] Introduce plan-auto-tag-for-read for Spark read 
(#7897)
b352de5b20 is described below

commit b352de5b20d46e35e35b1f85c7409ea0481a4efc
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 08:37:04 2026 +0800

    [spark] Introduce plan-auto-tag-for-read for Spark read (#7897)
    
    Introduce `'scan.plan-auto-tag-for-read.time-retained'` option:
    
    When set, a temporary tag will be auto-created during batch scan
    planning to protect the read snapshot from expiration. The value
    specifies the tag's TTL. Should be longer than the longest expected
    batch read duration.
---
 .../generated/catalog_configuration.html           |  60 ++++----
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  14 ++
 .../paimon/table/source/DataTableBatchScan.java    |  39 ++++-
 .../apache/paimon/table/source/InnerTableScan.java |   5 +
 .../org/apache/paimon/tag/BatchReadTagCreator.java | 105 +++++++++++++
 .../apache/paimon/tag/BatchReadTagCreatorTest.java | 139 ++++++++++++++++++
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  24 ++-
 .../spark/read/BatchReadTagCleanupListener.scala   |  93 ++++++++++++
 .../read/BatchReadTagConcurrentExpireTest.scala    | 163 +++++++++++++++++++++
 .../spark/read/BatchReadTagProtectionTest.scala    | 118 +++++++++++++++
 11 files changed, 723 insertions(+), 43 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 6cafa6ccdf..d885fee705 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -104,6 +104,36 @@ under the License.
             <td>Boolean</td>
             <td>Whether to support format tables, format table corresponds to 
a regular csv, parquet or orc table, allowing read and write operations. 
However, during these processes, it does not connect to the metastore; hence, 
newly added partitions will not be reflected in the metastore and need to be 
manually added as separate partition operations.</td>
         </tr>
+        <tr>
+            <td><h5>local-cache.block-size</h5></td>
+            <td style="word-wrap: break-word;">1 mb</td>
+            <td>MemorySize</td>
+            <td>Block size for local cache.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Directory for local block cache on disk. If not configured, 
memory cache is used instead.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable local block cache for file reads. If 
local-cache.dir is configured, disk cache is used; otherwise memory cache is 
used.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.max-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Maximum total size of the local block cache. Unlimited by 
default.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.whitelist</h5></td>
+            <td style="word-wrap: break-word;">"meta,global-index"</td>
+            <td>String</td>
+            <td>Comma-separated list of file types to cache. Supported values: 
meta, global-index, bucket-index, data, file-index.</td>
+        </tr>
         <tr>
             <td><h5>lock-acquire-timeout</h5></td>
             <td style="word-wrap: break-word;">8 min</td>
@@ -146,36 +176,6 @@ under the License.
             <td>Boolean</td>
             <td>Sync all table properties to the catalog metastore (e.g. Hive 
metastore, JDBC catalog store)</td>
         </tr>
-        <tr>
-            <td><h5>local-cache.block-size</h5></td>
-            <td style="word-wrap: break-word;">1 mb</td>
-            <td>MemorySize</td>
-            <td>Block size for local cache.</td>
-        </tr>
-        <tr>
-            <td><h5>local-cache.dir</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Directory for local block cache on disk. If not configured, 
memory cache is used instead.</td>
-        </tr>
-        <tr>
-            <td><h5>local-cache.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable local block cache for file reads. If 
local-cache.dir is configured, disk cache is used; otherwise memory cache is 
used.</td>
-        </tr>
-        <tr>
-            <td><h5>local-cache.max-size</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>MemorySize</td>
-            <td>Maximum total size of the local block cache. Unlimited by 
default.</td>
-        </tr>
-        <tr>
-            <td><h5>local-cache.whitelist</h5></td>
-            <td style="word-wrap: break-word;">"meta,global-index"</td>
-            <td>String</td>
-            <td>Comma-separated list of file types to cache. Supported values: 
meta, global-index, bucket-index, data, file-index.</td>
-        </tr>
         <tr>
             <td><h5>table.type</h5></td>
             <td style="word-wrap: break-word;">managed</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 7496cbfc15..0b82912983 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1259,6 +1259,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td><p>Enum</p></td>
             <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
         </tr>
+        <tr>
+            <td><h5>scan.plan-auto-tag-for-read.time-retained</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>When set, a temporary tag will be auto-created during batch 
scan planning to protect the read snapshot from expiration. The value specifies 
the tag's TTL. Should be longer than the longest expected batch read 
duration.</td>
+        </tr>
         <tr>
             <td><h5>scan.plan-sort-partition</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 176d1e9d4d..d5a1bb7fb0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1767,6 +1767,15 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("Use customized name when creating tags 
in Batch mode.");
 
+    public static final ConfigOption<Duration> 
SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED =
+            key("scan.plan-auto-tag-for-read.time-retained")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When set, a temporary tag will be auto-created 
during batch scan planning "
+                                    + "to protect the read snapshot from 
expiration. The value specifies the tag's TTL. "
+                                    + "Should be longer than the longest 
expected batch read duration.");
+
     public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT 
=
             key("snapshot.watermark-idle-timeout")
                     .durationType()
@@ -3550,6 +3559,11 @@ public class CoreOptions implements Serializable {
         return options.get(TAG_BATCH_CUSTOMIZED_NAME);
     }
 
+    @Nullable
+    public Duration scanPlanAutoTagTimeRetained() {
+        return options.get(SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED);
+    }
+
     public Duration snapshotWatermarkIdleTimeout() {
         return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 255ff5f672..710f736985 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -29,11 +29,17 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
+import org.apache.paimon.tag.BatchReadTagCreator;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -53,6 +59,7 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
     private TopN topN;
 
     private final SchemaManager schemaManager;
+    @Nullable private String readProtectionTagName;
 
     public DataTableBatchScan(
             TableSchema schema,
@@ -103,15 +110,20 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
 
         if (hasNext) {
             hasNext = false;
+            StartingScanner.Result result;
             Optional<StartingScanner.Result> pushed = applyPushDownLimit();
             if (pushed.isPresent()) {
-                return DataFilePlan.fromResult(pushed.get());
+                result = pushed.get();
+            } else {
+                pushed = applyPushDownTopN();
+                result = pushed.orElseGet(() -> 
startingScanner.scan(snapshotReader));
             }
-            pushed = applyPushDownTopN();
-            if (pushed.isPresent()) {
-                return DataFilePlan.fromResult(pushed.get());
+
+            if (result instanceof ScannedResult) {
+                maybeCreateReadProtectionTag(((ScannedResult) 
result).currentSnapshotId());
             }
-            return 
DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
+
+            return DataFilePlan.fromResult(result);
         } else {
             throw new EndOfScanException();
         }
@@ -209,4 +221,21 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
         return this;
     }
+
+    @Override
+    @Nullable
+    public String readProtectionTagName() {
+        return readProtectionTagName;
+    }
+
+    private void maybeCreateReadProtectionTag(long snapshotId) {
+        Duration timeRetained = options().scanPlanAutoTagTimeRetained();
+        if (timeRetained == null) {
+            return;
+        }
+        SnapshotManager sm = snapshotReader.snapshotManager();
+        TagManager tagMgr = new TagManager(sm.fileIO(), sm.tablePath(), 
sm.branch());
+        BatchReadTagCreator creator = new BatchReadTagCreator(tagMgr, sm, 
timeRetained);
+        this.readProtectionTagName = creator.createReadTag(snapshotId);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index c09809feb9..38278d4620 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -106,4 +106,9 @@ public interface InnerTableScan extends TableScan {
         // do nothing, should implement this if need
         return this;
     }
+
+    @Nullable
+    default String readProtectionTagName() {
+        return null;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
new file mode 100644
index 0000000000..8637709b31
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+
+/** Creates temporary tags to protect snapshots from expiration during batch 
reads. */
+public class BatchReadTagCreator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchReadTagCreator.class);
+
+    public static final String BATCH_READ_TAG_PREFIX = "batch-read-";
+
+    private final TagManager tagManager;
+    private final SnapshotManager snapshotManager;
+    private final Duration timeRetained;
+
+    public BatchReadTagCreator(
+            TagManager tagManager, SnapshotManager snapshotManager, Duration 
timeRetained) {
+        this.tagManager = tagManager;
+        this.snapshotManager = snapshotManager;
+        this.timeRetained = timeRetained;
+    }
+
+    @Nullable
+    public String createReadTag(long snapshotId) {
+        Snapshot snapshot;
+        try {
+            snapshot = snapshotManager.snapshot(snapshotId);
+        } catch (Exception e) {
+            LOG.warn("Failed to get snapshot {} for read protection tag.", 
snapshotId, e);
+            return null;
+        }
+
+        String tagName = generateTagName(snapshotId);
+        try {
+            tagManager.createTag(snapshot, tagName, timeRetained, 
Collections.emptyList(), true);
+            LOG.info(
+                    "Created batch read protection tag '{}' for snapshot {}.", 
tagName, snapshotId);
+            return tagName;
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to create batch read protection tag for snapshot 
{}. "
+                            + "Read will proceed without protection.",
+                    snapshotId,
+                    e);
+            return null;
+        }
+    }
+
+    public void deleteReadTag(String tagName) {
+        try {
+            if (tagManager.tagExists(tagName)) {
+                // Directly delete the tag metadata file instead of using 
TagManager.deleteTag(),
+                // which would also scan and delete unreferenced data files — 
too heavyweight for a
+                // read-path cleanup. Any orphan data files left behind will 
be reclaimed by
+                // OrphanFilesClean.
+                
snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName));
+                LOG.info("Deleted batch read protection tag '{}'.", tagName);
+            }
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to delete batch read protection tag '{}'. "
+                            + "It will be cleaned up by TTL expiration.",
+                    tagName,
+                    e);
+        }
+    }
+
+    public static boolean isBatchReadTag(String tagName) {
+        return tagName.startsWith(BATCH_READ_TAG_PREFIX);
+    }
+
+    private String generateTagName(long snapshotId) {
+        String uuid = UUID.randomUUID().toString().substring(0, 8);
+        return BATCH_READ_TAG_PREFIX + snapshotId + "-" + uuid;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
new file mode 100644
index 0000000000..4f225e2f07
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchReadTagCreator}. */
+public class BatchReadTagCreatorTest extends PrimaryKeyTableTestBase {
+
+    @Override
+    protected Options tableOptions() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        return options;
+    }
+
+    @Test
+    public void testCreateAndDeleteReadTag() throws Exception {
+        writeCommit(GenericRow.of(1, 1, 1));
+
+        SnapshotManager sm = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+        long snapshotId = sm.latestSnapshotId();
+
+        BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, 
Duration.ofHours(1));
+
+        String tagName = creator.createReadTag(snapshotId);
+        assertThat(tagName).isNotNull();
+        
assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+        assertThat(tagManager.tagExists(tagName)).isTrue();
+
+        creator.deleteReadTag(tagName);
+        assertThat(tagManager.tagExists(tagName)).isFalse();
+    }
+
+    @Test
+    public void testIsBatchReadTag() {
+        
assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-1-abc12345")).isTrue();
+        
assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-42-xyz")).isTrue();
+        assertThat(BatchReadTagCreator.isBatchReadTag("my-tag")).isFalse();
+        assertThat(BatchReadTagCreator.isBatchReadTag("2023-07-18 
11")).isFalse();
+    }
+
+    @Test
+    public void testCreateTagFailsGracefully() {
+        SnapshotManager sm = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, 
Duration.ofHours(1));
+
+        // snapshot 999 does not exist
+        String tagName = creator.createReadTag(999L);
+        assertThat(tagName).isNull();
+    }
+
+    @Test
+    public void testDeleteNonExistentTagIsNoOp() throws Exception {
+        writeCommit(GenericRow.of(1, 1, 1));
+
+        SnapshotManager sm = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, 
Duration.ofHours(1));
+
+        // should not throw
+        creator.deleteReadTag("batch-read-nonexistent-12345678");
+    }
+
+    @Test
+    public void testScanCreatesProtectionTag() throws Exception {
+        writeCommit(GenericRow.of(1, 1, 1));
+
+        Options options = new Options();
+        options.set(CoreOptions.SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED, 
Duration.ofHours(2));
+        FileStoreTable tableWithOption = table.copy(options.toMap());
+
+        InnerTableScan scan = tableWithOption.newScan();
+        TableScan.Plan plan = scan.plan();
+
+        assertThat(plan.splits()).isNotEmpty();
+        assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+
+        DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+        String tagName = batchScan.readProtectionTagName();
+        assertThat(tagName).isNotNull();
+        
assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+
+        TagManager tagManager = table.tagManager();
+        assertThat(tagManager.tagExists(tagName)).isTrue();
+
+        // verify tag has TTL set
+        Tag tag = tagManager.getOrThrow(tagName);
+        assertThat(tag.getTagTimeRetained()).isEqualTo(Duration.ofHours(2));
+    }
+
+    @Test
+    public void testScanDoesNotCreateTagWhenDisabled() throws Exception {
+        writeCommit(GenericRow.of(1, 1, 1));
+
+        // default: option not set
+        InnerTableScan scan = table.newScan();
+        scan.plan();
+
+        assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+        DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+        assertThat(batchScan.readProtectionTagName()).isNull();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 30a1621925..ff1e0bde37 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -20,14 +20,15 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.globalindex.GlobalIndexResult
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.PredicateBuilder
 import org.apache.paimon.spark.metric.SparkMetricRegistry
-import org.apache.paimon.spark.read.{BaseScan, PaimonSupportsRuntimeFiltering}
+import org.apache.paimon.spark.read.{BaseScan, BatchReadTagCleanupListener, 
PaimonSupportsRuntimeFiltering}
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
-import org.apache.paimon.table.source.{InnerTableScan, Split}
+import org.apache.paimon.table.source.{DataTableBatchScan, InnerTableScan, 
Split}
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.Batch
@@ -43,15 +44,22 @@ abstract class PaimonBaseScan(table: InnerTable)
   private lazy val paimonMetricsRegistry: SparkMetricRegistry = 
SparkMetricRegistry()
 
   protected def getInputSplits: Array[Split] = {
-    readBuilder
+    val scan = readBuilder
       .newScan()
       .withGlobalIndexResult(evalGlobalIndexSearch())
       .asInstanceOf[InnerTableScan]
       .withMetricRegistry(paimonMetricsRegistry)
-      .plan()
-      .splits()
-      .asScala
-      .toArray
+
+    val plan = scan.plan()
+
+    Option(scan.readProtectionTagName).foreach {
+      name =>
+        BatchReadTagCleanupListener
+          .getOrCreate(SparkSession.active)
+          .registerCleanup(name, table)
+    }
+
+    plan.splits().asScala.toArray
   }
 
   private def evalGlobalIndexSearch(): GlobalIndexResult = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
new file mode 100644
index 0000000000..988748c109
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.table.{DataTable, Table}
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * A Spark [[QueryExecutionListener]] that cleans up batch-read protection 
tags when a query
+ * completes (success or failure). TTL expiration serves as a safety net if 
this cleanup fails.
+ */
+class BatchReadTagCleanupListener extends QueryExecutionListener with Logging {
+
+  private val pendingCleanups = new ConcurrentHashMap[String, DataTable]()
+
+  def registerCleanup(tagName: String, table: Table): Unit = {
+    table match {
+      case dt: DataTable => pendingCleanups.put(tagName, dt)
+      case _ =>
+    }
+  }
+
+  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+    cleanupAll()
+  }
+
+  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+    cleanupAll()
+  }
+
+  private def cleanupAll(): Unit = {
+    val iter = pendingCleanups.entrySet().iterator()
+    while (iter.hasNext) {
+      val entry = iter.next()
+      val tagName = entry.getKey
+      val dataTable = entry.getValue
+      iter.remove()
+      try {
+        val creator = new BatchReadTagCreator(
+          dataTable.tagManager(),
+          dataTable.snapshotManager(),
+          dataTable.coreOptions().scanPlanAutoTagTimeRetained())
+        creator.deleteReadTag(tagName)
+      } catch {
+        case e: Exception =>
+          logWarning(
+            s"Failed to delete batch read protection tag '$tagName'. " +
+              "It will be cleaned up by TTL expiration.",
+            e)
+      }
+    }
+  }
+}
+
+object BatchReadTagCleanupListener {
+
+  @volatile private var instance: BatchReadTagCleanupListener = _
+
+  def getOrCreate(spark: SparkSession): BatchReadTagCleanupListener = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = new BatchReadTagCleanupListener()
+          spark.listenerManager.register(instance)
+        }
+      }
+    }
+    instance
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
new file mode 100644
index 0000000000..45d1118d85
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.options.ExpireConfig
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataTableBatchScan
+
+import java.time.Duration
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that simulate concurrent snapshot expiration during a batch read.
+ *
+ * Uses an append-only table with INSERT OVERWRITE to ensure old data files 
become unreferenced by
+ * later snapshots. Without protection: data files are deleted during 
expiration, read fails. With
+ * protection (auto-tag): data files are preserved, read succeeds.
+ */
+class BatchReadTagConcurrentExpireTest extends PaimonSparkTestBase {
+
+  test("Paimon: without protection, read fails after concurrent expiration") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |""".stripMargin)
+
+    // write snapshot 1
+    spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+    val table = loadTable("T")
+
+    // Step 1: plan the scan (get splits referencing snapshot 1 data files)
+    val scan = table.newScan()
+    val plan = scan.plan()
+    val splits = plan.splits()
+    assert(!splits.isEmpty)
+
+    // Step 2: OVERWRITE makes snapshot 1's files unreferenced by the new 
snapshot
+    spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+    spark.sql("INSERT INTO T VALUES (5, 'v5')")
+    spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+    // Step 3: aggressively expire snapshots (keep only 1)
+    val reloadedTable = loadTable("T")
+    reloadedTable
+      .newExpireSnapshots()
+      .config(
+        ExpireConfig
+          .builder()
+          .snapshotMaxDeletes(Integer.MAX_VALUE)
+          .snapshotRetainMax(1)
+          .snapshotRetainMin(1)
+          .snapshotTimeRetain(Duration.ZERO)
+          .build())
+      .expire()
+
+    // Step 4: try to read from the old splits - data files have been deleted
+    val read = table.newReadBuilder().newRead()
+    var readFailed = false
+    try {
+      val readers = splits.asScala.map(split => read.createReader(split))
+      readers.foreach {
+        reader =>
+          val iter = reader.toCloseableIterator
+          while (iter.hasNext) iter.next()
+          iter.close()
+      }
+    } catch {
+      case _: Exception =>
+        readFailed = true
+    }
+
+    assert(readFailed, "Read should fail because data files were deleted by 
expiration")
+  }
+
+  test("Paimon: with protection, read succeeds after concurrent expiration") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES (
+                 |  'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+                 |)
+                 |""".stripMargin)
+
+    // write snapshot 1
+    spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+    val table = loadTable("T")
+
+    // Step 1: plan the scan - this creates a protection tag automatically
+    val scan = table.newScan()
+    val plan = scan.plan()
+    val splits = plan.splits()
+    assert(!splits.isEmpty)
+
+    // Verify protection tag was created
+    val batchScan = scan.asInstanceOf[DataTableBatchScan]
+    val tagName = batchScan.readProtectionTagName
+    assert(tagName != null, "Protection tag should be created during scan 
planning")
+    assert(table.tagManager().tagExists(tagName))
+
+    // Step 2: OVERWRITE makes snapshot 1's files unreferenced by new snapshot
+    spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+    spark.sql("INSERT INTO T VALUES (5, 'v5')")
+    spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+    // Step 3: aggressively expire snapshots (keep only 1)
+    val reloadedTable = loadTable("T")
+    reloadedTable
+      .newExpireSnapshots()
+      .config(
+        ExpireConfig
+          .builder()
+          .snapshotMaxDeletes(Integer.MAX_VALUE)
+          .snapshotRetainMax(1)
+          .snapshotRetainMin(1)
+          .snapshotTimeRetain(Duration.ZERO)
+          .build())
+      .expire()
+
+    // Step 4: read from old splits - should succeed because tag protects data 
files
+    val read = table.newReadBuilder().newRead()
+    val results = new util.ArrayList[String]()
+    splits.asScala.foreach {
+      split =>
+        val reader = read.createReader(split)
+        val iter = reader.toCloseableIterator
+        while (iter.hasNext) {
+          val row = iter.next()
+          results.add(s"${row.getInt(0)},${row.getString(1).toString}")
+        }
+        iter.close()
+    }
+
+    assert(results.size() == 2, s"Should read 2 rows but got 
${results.size()}")
+    assert(results.contains("1,v1"))
+    assert(results.contains("2,v2"))
+
+    // Cleanup: delete the protection tag
+    val snapshotManager = table.snapshotManager()
+    val creator = new org.apache.paimon.tag.BatchReadTagCreator(
+      table.tagManager(),
+      snapshotManager,
+      Duration.ofHours(1))
+    creator.deleteReadTag(tagName)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
new file mode 100644
index 0000000000..afced7afed
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.sql.Row
+
+class BatchReadTagProtectionTest extends PaimonSparkTestBase {
+
+  test("Paimon: batch read creates protection tag when enabled") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES (
+                 |  'primary-key' = 'a',
+                 |  'bucket' = '1',
+                 |  'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+                 |)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+    val table = loadTable("T")
+    val tagManager = table.tagManager()
+
+    // query triggers scan which creates a protection tag
+    checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: 
Row(2, "v2") :: Nil)
+
+    // after query completes, listener should have cleaned up the tag
+    // give it a moment since listener fires asynchronously
+    Thread.sleep(500)
+
+    val remainingTags = tagManager.allTagNames()
+    val batchReadTags = remainingTags.toArray
+      .map(_.toString)
+      .filter(BatchReadTagCreator.isBatchReadTag)
+    assert(
+      batchReadTags.isEmpty,
+      s"Protection tags should be cleaned up, but found: 
${batchReadTags.mkString(", ")}")
+  }
+
+  test("Paimon: batch read does NOT create tag when disabled") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+    val table = loadTable("T")
+    val tagManager = table.tagManager()
+
+    checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: 
Row(2, "v2") :: Nil)
+
+    val remainingTags = tagManager.allTagNames()
+    val batchReadTags = remainingTags.toArray
+      .map(_.toString)
+      .filter(BatchReadTagCreator.isBatchReadTag)
+    assert(batchReadTags.isEmpty, "No protection tags should be created when 
feature is disabled")
+  }
+
+  test("Paimon: protection tag prevents data file deletion during expiration") 
{
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES (
+                 |  'primary-key' = 'a',
+                 |  'bucket' = '1',
+                 |  'scan.plan-auto-tag-for-read.time-retained' = '1 h',
+                 |  'snapshot.num-retained.min' = '1',
+                 |  'snapshot.num-retained.max' = '1'
+                 |)
+                 |""".stripMargin)
+
+    // create snapshot 1
+    spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+    val table = loadTable("T")
+    val tagManager = table.tagManager()
+    val snapshotManager = table.snapshotManager()
+
+    // manually create a protection tag on snapshot 1 (simulating what scan 
does)
+    val creator =
+      new BatchReadTagCreator(tagManager, snapshotManager, 
java.time.Duration.ofHours(1))
+    val tagName = creator.createReadTag(1L)
+    assert(tagName != null)
+    assert(tagManager.tagExists(tagName))
+
+    // create more snapshots to trigger expiration of snapshot 1
+    spark.sql("INSERT INTO T VALUES (3, 'v3')")
+    spark.sql("INSERT INTO T VALUES (4, 'v4')")
+
+    // snapshot 1 may be expired now, but data files should be protected by 
the tag
+    // read from the tag should still work
+    checkAnswer(
+      spark.sql(s"SELECT * FROM T VERSION AS OF '$tagName' ORDER BY a"),
+      Row(1, "v1") :: Row(2, "v2") :: Nil)
+
+    // cleanup
+    creator.deleteReadTag(tagName)
+  }
+}


Reply via email to