This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ce0e6e118 Spark 4.1: Separate compaction and main operations (#15301)
9ce0e6e118 is described below

commit 9ce0e6e11893d412b4d7e836df7b71247f10ae2f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Feb 16 17:21:46 2026 -0800

    Spark 4.1: Separate compaction and main operations (#15301)
---
 .../TestMetaColumnProjectionWithStageScan.java     |  10 +-
 .../extensions/TestRewriteDataFilesProcedure.java  |  15 +-
 .../iceberg/spark/SparkCachedTableCatalog.java     | 257 ---------------------
 .../org/apache/iceberg/spark/SparkReadConf.java    |   4 -
 .../org/apache/iceberg/spark/SparkReadOptions.java |   3 -
 .../iceberg/spark/SparkRewriteTableCatalog.java    | 120 ++++++++++
 .../org/apache/iceberg/spark/SparkTableCache.java  |   5 +
 .../org/apache/iceberg/spark/SparkWriteConf.java   |   7 -
 .../apache/iceberg/spark/SparkWriteOptions.java    |   3 -
 .../actions/SparkBinPackFileRewriteRunner.java     |   2 -
 .../actions/SparkRewritePositionDeleteRunner.java  |   2 -
 .../actions/SparkShufflingFileRewriteRunner.java   |   9 +-
 .../iceberg/spark/source/BaseSparkTable.java       | 165 +++++++++++++
 .../apache/iceberg/spark/source/IcebergSource.java |  32 +--
 .../spark/source/SparkPositionDeletesRewrite.java  |   6 +-
 .../source/SparkPositionDeletesRewriteBuilder.java |  21 +-
 .../iceberg/spark/source/SparkRewriteTable.java    |  74 ++++++
 .../spark/source/SparkRewriteWriteBuilder.java     |  91 ++++++++
 .../iceberg/spark/source/SparkStagedScan.java      |  21 +-
 .../spark/source/SparkStagedScanBuilder.java       |   7 +-
 .../apache/iceberg/spark/source/SparkTable.java    |  12 +-
 .../iceberg/spark/source/SparkWriteBuilder.java    |  17 +-
 .../iceberg/spark/TestFileRewriteCoordinator.java  |  71 +++---
 .../iceberg/spark/TestSparkCachedTableCatalog.java | 105 ---------
 .../spark/source/TestPositionDeletesTable.java     | 100 +++-----
 .../iceberg/spark/source/TestSparkStagedScan.java  |  26 +--
 26 files changed, 595 insertions(+), 590 deletions(-)

diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
index b783a006ef..56191b38ef 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -88,32 +89,31 @@ public class TestMetaColumnProjectionWithStageScan extends 
ExtensionsTestBase {
 
     Table table = Spark3Util.loadIcebergTable(spark, tableName);
     table.refresh();
-    String tableLocation = table.location();
 
     try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) 
{
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, table);
       stageTask(table, fileSetID, tasks);
       Dataset<Row> scanDF2 =
           spark
               .read()
               .format("iceberg")
               .option(SparkReadOptions.FILE_OPEN_COST, "0")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
-              .load(tableLocation);
+              .load(fileSetID);
 
       assertThat(scanDF2.columns()).hasSize(2);
     }
 
     try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) 
{
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, table);
       stageTask(table, fileSetID, tasks);
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
               .option(SparkReadOptions.FILE_OPEN_COST, "0")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
-              .load(tableLocation)
+              .load(fileSetID)
               .select("*", "_pos");
 
       List<Row> rows = scanDF.collectAsList();
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index c652b011ba..b37422beac 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -830,7 +830,10 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
 
-    assertThat(SparkTableCache.get().size()).as("Table cache must be 
empty").isZero();
+    Table table = validationCatalog.loadTable(identifier);
+    assertThat(SparkTableCache.get().tables())
+        .as("Table cache must not contain the test table")
+        .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
   }
 
   @TestTemplate
@@ -870,7 +873,10 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
 
-    assertThat(SparkTableCache.get().size()).as("Table cache must be 
empty").isZero();
+    Table table = validationCatalog.loadTable(identifier);
+    assertThat(SparkTableCache.get().tables())
+        .as("Table cache must not contain the test table")
+        .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
   }
 
   @TestTemplate
@@ -910,7 +916,10 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
 
-    assertThat(SparkTableCache.get().size()).as("Table cache must be 
empty").isZero();
+    Table table = validationCatalog.loadTable(identifier);
+    assertThat(SparkTableCache.get().tables())
+        .as("Table cache must not contain the test table")
+        .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
   }
 
   @TestTemplate
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
deleted file mode 100644
index 28427f597b..0000000000
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.spark.source.SparkTable;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.SnapshotUtil;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.catalog.TableChange;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/** An internal table catalog that is capable of loading tables from a cache. 
*/
-public class SparkCachedTableCatalog implements TableCatalog, 
SupportsFunctions {
-
-  private static final String CLASS_NAME = 
SparkCachedTableCatalog.class.getName();
-  private static final Splitter COMMA = Splitter.on(",");
-  private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
-  private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
-  private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
-  private static final Pattern TAG = Pattern.compile("tag_(.*)");
-  private static final String REWRITE = "rewrite";
-
-  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
-
-  private String name = null;
-
-  @Override
-  public Identifier[] listTables(String[] namespace) {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
listing tables");
-  }
-
-  @Override
-  public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
-    return load(ident);
-  }
-
-  @Override
-  public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
-    SparkTable table = load(ident);
-    Preconditions.checkArgument(
-        table.snapshotId() == null, "Cannot time travel based on both table 
identifier and AS OF");
-    return table.copyWithSnapshotId(Long.parseLong(version));
-  }
-
-  @Override
-  public SparkTable loadTable(Identifier ident, long timestampMicros) throws 
NoSuchTableException {
-    SparkTable table = load(ident);
-    Preconditions.checkArgument(
-        table.snapshotId() == null, "Cannot time travel based on both table 
identifier and AS OF");
-    // Spark passes microseconds but Iceberg uses milliseconds for snapshots
-    long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestampMicros);
-    long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table.table(), 
timestampMillis);
-    return table.copyWithSnapshotId(snapshotId);
-  }
-
-  @Override
-  public void invalidateTable(Identifier ident) {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
table invalidation");
-  }
-
-  @Override
-  public SparkTable createTable(
-      Identifier ident, StructType schema, Transform[] partitions, Map<String, 
String> properties)
-      throws TableAlreadyExistsException {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
creating tables");
-  }
-
-  @Override
-  public SparkTable alterTable(Identifier ident, TableChange... changes) {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
altering tables");
-  }
-
-  @Override
-  public boolean dropTable(Identifier ident) {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
dropping tables");
-  }
-
-  @Override
-  public boolean purgeTable(Identifier ident) throws 
UnsupportedOperationException {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
purging tables");
-  }
-
-  @Override
-  public void renameTable(Identifier oldIdent, Identifier newIdent) {
-    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
renaming tables");
-  }
-
-  @Override
-  public void initialize(String catalogName, CaseInsensitiveStringMap options) 
{
-    this.name = catalogName;
-  }
-
-  @Override
-  public String name() {
-    return name;
-  }
-
-  private SparkTable load(Identifier ident) throws NoSuchTableException {
-    Preconditions.checkArgument(
-        ident.namespace().length == 0, CLASS_NAME + " does not support 
namespaces");
-
-    Pair<String, List<String>> parsedIdent = parseIdent(ident);
-    String key = parsedIdent.first();
-    TableLoadOptions options = parseLoadOptions(parsedIdent.second());
-
-    Table table = TABLE_CACHE.get(key);
-
-    if (table == null) {
-      throw new NoSuchTableException(ident);
-    }
-
-    if (options.isTableRewrite()) {
-      return new SparkTable(table, null, false, true);
-    }
-
-    if (options.snapshotId() != null) {
-      return new SparkTable(table, options.snapshotId(), false);
-    } else if (options.asOfTimestamp() != null) {
-      return new SparkTable(
-          table, SnapshotUtil.snapshotIdAsOfTime(table, 
options.asOfTimestamp()), false);
-    } else if (options.branch() != null) {
-      Snapshot branchSnapshot = table.snapshot(options.branch());
-      Preconditions.checkArgument(
-          branchSnapshot != null,
-          "Cannot find snapshot associated with branch name: %s",
-          options.branch());
-      return new SparkTable(table, branchSnapshot.snapshotId(), false);
-    } else if (options.tag() != null) {
-      Snapshot tagSnapshot = table.snapshot(options.tag());
-      Preconditions.checkArgument(
-          tagSnapshot != null, "Cannot find snapshot associated with tag name: 
%s", options.tag());
-      return new SparkTable(table, tagSnapshot.snapshotId(), false);
-    } else {
-      return new SparkTable(table, false);
-    }
-  }
-
-  private static class TableLoadOptions {
-    private Long asOfTimestamp;
-    private Long snapshotId;
-    private String branch;
-    private String tag;
-    private Boolean isTableRewrite;
-
-    Long asOfTimestamp() {
-      return asOfTimestamp;
-    }
-
-    Long snapshotId() {
-      return snapshotId;
-    }
-
-    String branch() {
-      return branch;
-    }
-
-    String tag() {
-      return tag;
-    }
-
-    boolean isTableRewrite() {
-      return Boolean.TRUE.equals(isTableRewrite);
-    }
-  }
-
-  /** Extracts table load options from metadata. */
-  private TableLoadOptions parseLoadOptions(List<String> metadata) {
-    TableLoadOptions opts = new TableLoadOptions();
-    for (String meta : metadata) {
-      Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta);
-      if (timeBasedMatcher.matches()) {
-        opts.asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1));
-        continue;
-      }
-
-      Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta);
-      if (snapshotBasedMatcher.matches()) {
-        opts.snapshotId = Long.parseLong(snapshotBasedMatcher.group(1));
-        continue;
-      }
-
-      Matcher branchBasedMatcher = BRANCH.matcher(meta);
-      if (branchBasedMatcher.matches()) {
-        opts.branch = branchBasedMatcher.group(1);
-        continue;
-      }
-
-      Matcher tagBasedMatcher = TAG.matcher(meta);
-      if (tagBasedMatcher.matches()) {
-        opts.tag = tagBasedMatcher.group(1);
-      }
-
-      if (meta.equalsIgnoreCase(REWRITE)) {
-        opts.isTableRewrite = true;
-      }
-    }
-
-    long numberOptions =
-        Stream.of(opts.snapshotId, opts.asOfTimestamp, opts.branch, opts.tag, 
opts.isTableRewrite)
-            .filter(Objects::nonNull)
-            .count();
-    Preconditions.checkArgument(
-        numberOptions <= 1,
-        "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), 
branch (%s), tag (%s), is-table-rewrite (%s)",
-        opts.snapshotId,
-        opts.asOfTimestamp,
-        opts.branch,
-        opts.tag,
-        opts.isTableRewrite);
-
-    return opts;
-  }
-
-  private Pair<String, List<String>> parseIdent(Identifier ident) {
-    int hashIndex = ident.name().lastIndexOf('#');
-    if (hashIndex != -1 && !ident.name().endsWith("#")) {
-      String key = ident.name().substring(0, hashIndex);
-      List<String> metadata = 
COMMA.splitToList(ident.name().substring(hashIndex + 1));
-      return Pair.of(key, metadata);
-    } else {
-      return Pair.of(ident.name(), ImmutableList.of());
-    }
-  }
-}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 420c3517ff..238919ace7 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -132,10 +132,6 @@ public class SparkReadConf {
     return 
confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
   }
 
-  public String scanTaskSetId() {
-    return 
confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional();
-  }
-
   public boolean streamingSkipDeleteSnapshots() {
     return confParser
         .booleanConf()
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 17f2bfee69..8071b1db5b 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -62,9 +62,6 @@ public class SparkReadOptions {
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
 
-  // Set ID that is used to fetch scan tasks
-  public static final String SCAN_TASK_SET_ID = "scan-task-set-id";
-
   // skip snapshots of type delete while reading stream out of iceberg table
   public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = 
"streaming-skip-delete-snapshots";
   public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
new file mode 100644
index 0000000000..a1016beb18
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.source.SparkRewriteTable;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkRewriteTableCatalog implements TableCatalog, 
SupportsFunctions {
+
+  private static final String CLASS_NAME = 
SparkRewriteTableCatalog.class.getName();
+  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
+
+  private String name = null;
+
+  @Override
+  public Identifier[] listTables(String[] namespace) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
listing tables");
+  }
+
+  @Override
+  public SparkRewriteTable loadTable(Identifier ident) throws 
NoSuchTableException {
+    validateNoNamespace(ident);
+
+    String groupId = ident.name();
+    Table table = TABLE_CACHE.get(groupId);
+
+    if (table == null) {
+      throw new NoSuchTableException(ident);
+    }
+
+    return new SparkRewriteTable(table, groupId);
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
time travel");
+  }
+
+  @Override
+  public SparkTable loadTable(Identifier ident, long timestampMicros) throws 
NoSuchTableException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
time travel");
+  }
+
+  @Override
+  public void invalidateTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
table invalidation");
+  }
+
+  @Override
+  public SparkTable createTable(
+      Identifier ident, StructType schema, Transform[] partitions, Map<String, 
String> properties)
+      throws TableAlreadyExistsException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
creating tables");
+  }
+
+  @Override
+  public SparkTable alterTable(Identifier ident, TableChange... changes) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
altering tables");
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
dropping tables");
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) throws 
UnsupportedOperationException {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
purging tables");
+  }
+
+  @Override
+  public void renameTable(Identifier oldIdent, Identifier newIdent) {
+    throw new UnsupportedOperationException(CLASS_NAME + " does not support 
renaming tables");
+  }
+
+  @Override
+  public void initialize(String catalogName, CaseInsensitiveStringMap options) 
{
+    this.name = catalogName;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private void validateNoNamespace(Identifier ident) {
+    Preconditions.checkArgument(
+        ident.namespace().length == 0,
+        "%s does not support namespaces, but got: %s",
+        CLASS_NAME,
+        String.join(".", ident.namespace()));
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
index 6218423db4..83c6303d0f 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark;
 
+import java.util.Collection;
 import java.util.Map;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -51,4 +52,8 @@ public class SparkTableCache {
   public Table remove(String key) {
     return cache.remove(key);
   }
+
+  public Collection<Table> tables() {
+    return cache.values();
+  }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 96131e0e56..6648d7ea38 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -267,13 +267,6 @@ public class SparkWriteConf {
     return extraSnapshotMetadata;
   }
 
-  public String rewrittenFileSetId() {
-    return confParser
-        .stringConf()
-        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)
-        .parseOptional();
-  }
-
   public SparkWriteRequirements writeRequirements() {
     if (ignoreTableDistributionAndOrdering()) {
       LOG.info("Skipping distribution/ordering: disabled per job 
configuration");
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 33db70bae5..40816eef2f 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -50,9 +50,6 @@ public class SparkWriteOptions {
   // Checks if input schema and table schema are same(default: true)
   public static final String CHECK_ORDERING = "check-ordering";
 
-  // File scan task set ID that indicates which files must be replaced
-  public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = 
"rewritten-file-scan-task-set-id";
-
   public static final String OUTPUT_SPEC_ID = "output-spec-id";
 
   public static final String OVERWRITE_MODE = "overwrite-mode";
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
index 6d2ef585b1..084e21b1bd 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
@@ -45,7 +45,6 @@ class SparkBinPackFileRewriteRunner extends 
SparkDataFileRewriteRunner {
         spark()
             .read()
             .format("iceberg")
-            .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
             .option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize())
             .option(SparkReadOptions.FILE_OPEN_COST, "0")
             .load(groupId);
@@ -54,7 +53,6 @@ class SparkBinPackFileRewriteRunner extends 
SparkDataFileRewriteRunner {
     scanDF
         .write()
         .format("iceberg")
-        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
         .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, 
group.maxOutputFileSize())
         .option(SparkWriteOptions.DISTRIBUTION_MODE, 
distributionMode(group).modeName())
         .option(SparkWriteOptions.OUTPUT_SPEC_ID, group.outputSpecId())
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
index 4bbd228056..4bcf208110 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java
@@ -105,7 +105,6 @@ class SparkRewritePositionDeleteRunner
         spark()
             .read()
             .format("iceberg")
-            .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
             .option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize())
             .option(SparkReadOptions.FILE_OPEN_COST, "0")
             .load(groupId);
@@ -120,7 +119,6 @@ class SparkRewritePositionDeleteRunner
         .sortWithinPartitions("file_path", "pos")
         .write()
         .format("iceberg")
-        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
         .option(SparkWriteOptions.TARGET_DELETE_FILE_SIZE_BYTES, 
group.maxOutputFileSize())
         .mode("append")
         .save(groupId);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
index 569eb252cb..bc1665dc32 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java
@@ -31,7 +31,6 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFunctionCatalog;
-import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SortOrderUtil;
@@ -104,12 +103,7 @@ abstract class SparkShufflingFileRewriteRunner extends 
SparkDataFileRewriteRunne
 
   @Override
   public void doRewrite(String groupId, RewriteFileGroup fileGroup) {
-    Dataset<Row> scanDF =
-        spark()
-            .read()
-            .format("iceberg")
-            .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
-            .load(groupId);
+    Dataset<Row> scanDF = spark().read().format("iceberg").load(groupId);
 
     Dataset<Row> sortedDF =
         sortedDF(
@@ -122,7 +116,6 @@ abstract class SparkShufflingFileRewriteRunner extends 
SparkDataFileRewriteRunne
     sortedDF
         .write()
         .format("iceberg")
-        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
         .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, 
fileGroup.maxOutputFileSize())
         .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
         .option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
new file mode 100644
index 0000000000..a5d9293a9f
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_ID;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+
+abstract class BaseSparkTable
+    implements org.apache.spark.sql.connector.catalog.Table, 
SupportsMetadataColumns {
+
+  private static final String PROVIDER = "provider";
+  private static final String FORMAT = "format";
+  private static final String LOCATION = "location";
+  private static final String SORT_ORDER = "sort-order";
+  private static final String IDENTIFIER_FIELDS = "identifier-fields";
+  private static final Set<String> RESERVED_PROPERTIES =
+      ImmutableSet.of(
+          PROVIDER,
+          FORMAT,
+          CURRENT_SNAPSHOT_ID,
+          LOCATION,
+          FORMAT_VERSION,
+          SORT_ORDER,
+          IDENTIFIER_FIELDS);
+
+  private final Table table;
+  private final Schema schema;
+
+  private SparkSession lazySpark = null;
+  private StructType lazySparkSchema = null;
+
+  protected BaseSparkTable(Table table, Schema schema) {
+    this.table = table;
+    this.schema = schema;
+  }
+
+  protected SparkSession spark() {
+    if (lazySpark == null) {
+      this.lazySpark = SparkSession.active();
+    }
+    return lazySpark;
+  }
+
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public String name() {
+    return table.toString();
+  }
+
+  @Override
+  public StructType schema() {
+    if (lazySparkSchema == null) {
+      this.lazySparkSchema = SparkSchemaUtil.convert(schema);
+    }
+    return lazySparkSchema;
+  }
+
+  @Override
+  public Transform[] partitioning() {
+    return Spark3Util.toTransforms(table.spec());
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();
+
+    propsBuilder.put(FORMAT, "iceberg/" + fileFormat());
+    propsBuilder.put(PROVIDER, "iceberg");
+    propsBuilder.put(LOCATION, table.location());
+    propsBuilder.put(CURRENT_SNAPSHOT_ID, currentSnapshotId());
+
+    if (table instanceof BaseTable) {
+      TableOperations ops = ((BaseTable) table).operations();
+      propsBuilder.put(FORMAT_VERSION, 
String.valueOf(ops.current().formatVersion()));
+    }
+
+    if (table.sortOrder().isSorted()) {
+      propsBuilder.put(SORT_ORDER, Spark3Util.describe(table.sortOrder()));
+    }
+
+    Set<String> identifierFields = table.schema().identifierFieldNames();
+    if (!identifierFields.isEmpty()) {
+      propsBuilder.put(IDENTIFIER_FIELDS, "[" + String.join(",", 
identifierFields) + "]");
+    }
+
+    table.properties().entrySet().stream()
+        .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
+        .forEach(propsBuilder::put);
+
+    return propsBuilder.build();
+  }
+
+  @Override
+  public MetadataColumn[] metadataColumns() {
+    List<SparkMetadataColumn> cols = Lists.newArrayList();
+
+    cols.add(SparkMetadataColumns.SPEC_ID);
+    cols.add(SparkMetadataColumns.partition(table));
+    cols.add(SparkMetadataColumns.FILE_PATH);
+    cols.add(SparkMetadataColumns.ROW_POSITION);
+    cols.add(SparkMetadataColumns.IS_DELETED);
+
+    if (TableUtil.supportsRowLineage(table)) {
+      cols.add(SparkMetadataColumns.ROW_ID);
+      cols.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+    }
+
+    return cols.toArray(SparkMetadataColumn[]::new);
+  }
+
+  private String fileFormat() {
+    return table.properties().getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
+  }
+
+  private String currentSnapshotId() {
+    Snapshot currentSnapshot = table.currentSnapshot();
+    return currentSnapshot != null ? 
String.valueOf(currentSnapshot.snapshotId()) : "none";
+  }
+
+  @Override
+  public String toString() {
+    return table.toString();
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index a9df994615..a0462e8f89 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -27,12 +27,11 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.PathIdentifier;
 import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.SparkCachedTableCatalog;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkRewriteTableCatalog;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.iceberg.spark.SparkTableCache;
-import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -66,15 +65,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 public class IcebergSource
     implements DataSourceRegister, SupportsCatalogOptions, 
SessionConfigSupport {
   private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
-  private static final String DEFAULT_CACHE_CATALOG_NAME = 
"default_cache_iceberg";
-  private static final String DEFAULT_CATALOG = "spark.sql.catalog." + 
DEFAULT_CATALOG_NAME;
-  private static final String DEFAULT_CACHE_CATALOG =
-      "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
+  private static final String REWRITE_CATALOG_NAME = "default_rewrite_catalog";
+  private static final String CATALOG_PREFIX = "spark.sql.catalog.";
+  private static final String DEFAULT_CATALOG = CATALOG_PREFIX + 
DEFAULT_CATALOG_NAME;
+  private static final String REWRITE_CATALOG = CATALOG_PREFIX + 
REWRITE_CATALOG_NAME;
   private static final String AT_TIMESTAMP = "at_timestamp_";
   private static final String SNAPSHOT_ID = "snapshot_id_";
   private static final String BRANCH_PREFIX = "branch_";
   private static final String TAG_PREFIX = "tag_";
-  private static final String REWRITE_SELECTOR = "rewrite";
   private static final String[] EMPTY_NAMESPACE = new String[0];
 
   private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
@@ -165,21 +163,15 @@ public class IcebergSource
       selector = TAG_PREFIX + tag;
     }
 
-    String groupId =
-        options.getOrDefault(
-            SparkReadOptions.SCAN_TASK_SET_ID,
-            options.get(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID));
-    if (groupId != null) {
-      selector = REWRITE_SELECTOR;
-    }
-
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
+    // return rewrite catalog with path as group ID if table is staged for 
rewrite
     if (TABLE_CACHE.contains(path)) {
       return new Spark3Util.CatalogAndIdentifier(
-          catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME),
-          Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector)));
-    } else if (path.contains("/")) {
+          catalogManager.catalog(REWRITE_CATALOG_NAME), 
Identifier.of(EMPTY_NAMESPACE, path));
+    }
+
+    if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
       return new Spark3Util.CatalogAndIdentifier(
           catalogManager.catalog(DEFAULT_CATALOG_NAME),
@@ -258,8 +250,8 @@ public class IcebergSource
       config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + 
key, value));
     }
 
-    if (spark.conf().getOption(DEFAULT_CACHE_CATALOG).isEmpty()) {
-      spark.conf().set(DEFAULT_CACHE_CATALOG, 
SparkCachedTableCatalog.class.getName());
+    if (spark.conf().getOption(REWRITE_CATALOG).isEmpty()) {
+      spark.conf().set(REWRITE_CATALOG, 
SparkRewriteTableCatalog.class.getName());
     }
   }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 0ec7084bfd..7fc535ffa5 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -70,13 +70,13 @@ public class SparkPositionDeletesRewrite implements Write {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final String fileSetId;
   private final String queryId;
   private final FileFormat format;
   private final long targetFileSize;
   private final DeleteGranularity deleteGranularity;
   private final Schema writeSchema;
   private final StructType dsSchema;
-  private final String fileSetId;
   private final int specId;
   private final StructLike partition;
   private final Map<String, String> writeProperties;
@@ -86,6 +86,7 @@ public class SparkPositionDeletesRewrite implements Write {
    *
    * @param spark Spark session
    * @param table instance of {@link PositionDeletesTable}
+   * @param fileSetId file set ID
    * @param writeConf Spark write config
    * @param writeInfo Spark write info
    * @param writeSchema Iceberg output schema
@@ -96,6 +97,7 @@ public class SparkPositionDeletesRewrite implements Write {
   SparkPositionDeletesRewrite(
       SparkSession spark,
       Table table,
+      String fileSetId,
       SparkWriteConf writeConf,
       LogicalWriteInfo writeInfo,
       Schema writeSchema,
@@ -104,13 +106,13 @@ public class SparkPositionDeletesRewrite implements Write 
{
       StructLike partition) {
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
+    this.fileSetId = fileSetId;
     this.queryId = writeInfo.queryId();
     this.format = writeConf.deleteFileFormat();
     this.targetFileSize = writeConf.targetDeleteFileSize();
     this.deleteGranularity = writeConf.deleteGranularity();
     this.writeSchema = writeSchema;
     this.dsSchema = dsSchema;
-    this.fileSetId = writeConf.rewrittenFileSetId();
     this.specId = specId;
     this.partition = partition;
     this.writeProperties = writeConf.writeProperties();
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
index 9fccc05ea2..5e5d268ab9 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
@@ -50,28 +50,25 @@ public class SparkPositionDeletesRewriteBuilder implements 
WriteBuilder {
 
   private final SparkSession spark;
   private final Table table;
+  private final String fileSetId;
   private final SparkWriteConf writeConf;
-  private final LogicalWriteInfo writeInfo;
+  private final LogicalWriteInfo info;
   private final StructType dsSchema;
   private final Schema writeSchema;
 
   SparkPositionDeletesRewriteBuilder(
-      SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+      SparkSession spark, Table table, String fileSetId, LogicalWriteInfo 
info) {
     this.spark = spark;
     this.table = table;
-    this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
-    this.writeInfo = info;
+    this.fileSetId = fileSetId;
+    this.writeConf = new SparkWriteConf(spark, table, info.options());
+    this.info = info;
     this.dsSchema = info.schema();
     this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, 
writeConf.caseSensitive());
   }
 
   @Override
   public Write build() {
-    String fileSetId = writeConf.rewrittenFileSetId();
-
-    Preconditions.checkArgument(
-        fileSetId != null, "Can only write to %s via actions", table.name());
-
     // all files of rewrite group have same partition and spec id
     ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
     List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, 
fileSetId);
@@ -82,10 +79,10 @@ public class SparkPositionDeletesRewriteBuilder implements 
WriteBuilder {
     StructLike partition = partition(fileSetId, tasks);
 
     return new SparkPositionDeletesRewrite(
-        spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, 
partition);
+        spark, table, fileSetId, writeConf, info, writeSchema, dsSchema, 
specId, partition);
   }
 
-  private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+  private static int specId(String fileSetId, List<PositionDeletesScanTask> 
tasks) {
     Set<Integer> specIds = tasks.stream().map(t -> 
t.spec().specId()).collect(Collectors.toSet());
     Preconditions.checkArgument(
         specIds.size() == 1,
@@ -95,7 +92,7 @@ public class SparkPositionDeletesRewriteBuilder implements 
WriteBuilder {
     return tasks.get(0).spec().specId();
   }
 
-  private StructLike partition(String fileSetId, List<PositionDeletesScanTask> 
tasks) {
+  private static StructLike partition(String fileSetId, 
List<PositionDeletesScanTask> tasks) {
     StructLikeSet partitions = 
StructLikeSet.create(tasks.get(0).spec().partitionType());
     tasks.stream().map(ContentScanTask::partition).forEach(partitions::add);
     Preconditions.checkArgument(
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
new file mode 100644
index 0000000000..73d5b34f1c
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkRewriteTable extends BaseSparkTable implements SupportsRead, 
SupportsWrite {
+
+  private static final Set<TableCapability> CAPABILITIES =
+      ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE);
+
+  private final String groupId;
+
+  public SparkRewriteTable(Table table, String groupId) {
+    super(table, rewriteSchema(table));
+    this.groupId = groupId;
+  }
+
+  @Override
+  public Set<TableCapability> capabilities() {
+    return CAPABILITIES;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    return new SparkStagedScanBuilder(spark(), table(), groupId, options);
+  }
+
+  @Override
+  public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+    if (table() instanceof PositionDeletesTable) {
+      return new SparkPositionDeletesRewriteBuilder(spark(), table(), groupId, 
info);
+    } else {
+      return new SparkRewriteWriteBuilder(spark(), table(), 
rewriteSchema(table()), groupId, info);
+    }
+  }
+
+  private static Schema rewriteSchema(Table table) {
+    if (TableUtil.supportsRowLineage(table)) {
+      return MetadataColumns.schemaWithRowLineage(table.schema());
+    } else {
+      return table.schema();
+    }
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
new file mode 100644
index 0000000000..714ac8e485
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+
+class SparkRewriteWriteBuilder implements WriteBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final Schema schema;
+  private final String groupId;
+  private final SparkWriteConf writeConf;
+  private final LogicalWriteInfo info;
+  private final boolean caseSensitive;
+  private final boolean checkNullability;
+  private final boolean checkOrdering;
+
+  SparkRewriteWriteBuilder(
+      SparkSession spark, Table table, Schema schema, String groupId, 
LogicalWriteInfo info) {
+    this.spark = spark;
+    this.table = table;
+    this.schema = schema;
+    this.groupId = groupId;
+    this.writeConf = new SparkWriteConf(spark, table, info.options());
+    this.info = info;
+    this.caseSensitive = writeConf.caseSensitive();
+    this.checkNullability = writeConf.checkNullability();
+    this.checkOrdering = writeConf.checkOrdering();
+  }
+
+  @Override
+  public Write build() {
+    Schema writeSchema = validateWriteSchema();
+    SparkUtil.validatePartitionTransforms(table.spec());
+    String appId = spark.sparkContext().applicationId();
+    return new SparkWrite(
+        spark,
+        table,
+        writeConf,
+        info,
+        appId,
+        writeSchema,
+        info.schema(),
+        writeConf.writeRequirements()) {
+
+      @Override
+      public BatchWrite toBatch() {
+        return asRewrite(groupId);
+      }
+
+      @Override
+      public StreamingWrite toStreaming() {
+        throw new UnsupportedOperationException("Streaming writes are not 
supported for rewrites");
+      }
+    };
+  }
+
+  private Schema validateWriteSchema() {
+    Schema writeSchema = SparkSchemaUtil.convert(schema, info.schema(), 
caseSensitive);
+    TypeUtil.validateWriteSchema(schema, writeSchema, checkNullability, 
checkOrdering);
+    return writeSchema;
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index 394c922736..435c2cbd15 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.source;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import org.apache.iceberg.ScanTask;
@@ -28,8 +29,10 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Statistics;
 
 class SparkStagedScan extends SparkScan {
 
@@ -40,14 +43,26 @@ class SparkStagedScan extends SparkScan {
 
   private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of 
tasks
 
-  SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, 
SparkReadConf readConf) {
-    super(spark, table, readConf, expectedSchema, ImmutableList.of(), null);
-    this.taskSetId = readConf.scanTaskSetId();
+  SparkStagedScan(
+      SparkSession spark,
+      Table table,
+      Schema projection,
+      String taskSetId,
+      SparkReadConf readConf) {
+    super(spark, table, readConf, projection, ImmutableList.of(), null);
+    this.taskSetId = taskSetId;
     this.splitSize = readConf.splitSize();
     this.splitLookback = readConf.splitLookback();
     this.openFileCost = readConf.splitOpenFileCost();
   }
 
+  @Override
+  public Statistics estimateStatistics() {
+    long rowsCount = 
taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+    long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+    return new Stats(sizeInBytes, rowsCount, Collections.emptyMap());
+  }
+
   @Override
   protected List<ScanTaskGroup<ScanTask>> taskGroups() {
     if (taskGroups == null) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
index c5c86c3ebf..7164c53a3d 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
@@ -41,21 +41,24 @@ class SparkStagedScanBuilder implements ScanBuilder, 
SupportsPushDownRequiredCol
 
   private final SparkSession spark;
   private final Table table;
+  private final String taskSetId;
   private final SparkReadConf readConf;
   private final List<String> metaColumns = Lists.newArrayList();
 
   private Schema schema;
 
-  SparkStagedScanBuilder(SparkSession spark, Table table, 
CaseInsensitiveStringMap options) {
+  SparkStagedScanBuilder(
+      SparkSession spark, Table table, String taskSetId, 
CaseInsensitiveStringMap options) {
     this.spark = spark;
     this.table = table;
+    this.taskSetId = taskSetId;
     this.readConf = new SparkReadConf(spark, table, options);
     this.schema = table.schema();
   }
 
   @Override
   public Scan build() {
-    return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), 
readConf);
+    return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), 
taskSetId, readConf);
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 8261d03178..335d0f72fd 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PositionDeletesTable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
@@ -289,10 +288,6 @@ public class SparkTable
 
   @Override
   public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
-    if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {
-      return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);
-    }
-
     if (refreshEagerly) {
       icebergTable.refresh();
     }
@@ -307,12 +302,7 @@ public class SparkTable
   public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
     Preconditions.checkArgument(
         snapshotId == null, "Cannot write to table at a specific snapshot: 
%s", snapshotId);
-
-    if (icebergTable instanceof PositionDeletesTable) {
-      return new SparkPositionDeletesRewriteBuilder(sparkSession(), 
icebergTable, branch, info);
-    } else {
-      return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
-    }
+    return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 89af7740d9..182e56a861 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -54,7 +54,6 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
   private final LogicalWriteInfo writeInfo;
   private final StructType dsSchema;
   private final String overwriteMode;
-  private final String rewrittenFileSetId;
   private boolean overwriteDynamic = false;
   private boolean overwriteByFilter = false;
   private Expression overwriteExpr = null;
@@ -70,15 +69,12 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
     this.writeInfo = info;
     this.dsSchema = info.schema();
     this.overwriteMode = writeConf.overwriteMode();
-    this.rewrittenFileSetId = writeConf.rewrittenFileSetId();
   }
 
   public WriteBuilder overwriteFiles(Scan scan, Command command, 
IsolationLevel isolationLevel) {
     Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual 
files and by filter");
     Preconditions.checkState(
         !overwriteDynamic, "Cannot overwrite individual files and 
dynamically");
-    Preconditions.checkState(
-        rewrittenFileSetId == null, "Cannot overwrite individual files and 
rewrite");
 
     this.overwriteFiles = true;
     this.copyOnWriteScan = (SparkCopyOnWriteScan) scan;
@@ -92,8 +88,6 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
     Preconditions.checkState(
         !overwriteByFilter, "Cannot overwrite dynamically and by filter: %s", 
overwriteExpr);
     Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual 
files and dynamically");
-    Preconditions.checkState(
-        rewrittenFileSetId == null, "Cannot overwrite dynamically and 
rewrite");
 
     this.overwriteDynamic = true;
     return this;
@@ -103,7 +97,6 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
   public WriteBuilder overwrite(Filter[] filters) {
     Preconditions.checkState(
         !overwriteFiles, "Cannot overwrite individual files and using 
filters");
-    Preconditions.checkState(rewrittenFileSetId == null, "Cannot overwrite and 
rewrite");
 
     this.overwriteExpr = SparkFilters.convert(filters);
     if (overwriteExpr == Expressions.alwaysTrue() && 
"dynamic".equals(overwriteMode)) {
@@ -123,9 +116,7 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
     // operation or if it's a compaction.
     // In any other case, only null row IDs and sequence numbers would be 
produced which
     // means the row lineage columns can be excluded from the output files
-    boolean writeRequiresRowLineage =
-        TableUtil.supportsRowLineage(table)
-            && (overwriteFiles || writeConf.rewrittenFileSetId() != null);
+    boolean writeRequiresRowLineage = TableUtil.supportsRowLineage(table) && 
overwriteFiles;
     boolean writeAlreadyIncludesLineage =
         dsSchema.exists(field -> 
field.name().equals(MetadataColumns.ROW_ID.name()));
     StructType sparkWriteSchema = dsSchema;
@@ -156,9 +147,7 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
 
       @Override
       public BatchWrite toBatch() {
-        if (rewrittenFileSetId != null) {
-          return asRewrite(rewrittenFileSetId);
-        } else if (overwriteByFilter) {
+        if (overwriteByFilter) {
           return asOverwriteByFilter(overwriteExpr);
         } else if (overwriteDynamic) {
           return asDynamicOverwrite();
@@ -177,8 +166,6 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
             !overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
             "Unsupported streaming operation: overwrite by filter: %s",
             overwriteExpr);
-        Preconditions.checkState(
-            rewrittenFileSetId == null, "Unsupported streaming operation: 
rewrite");
 
         if (overwriteByFilter) {
           return asStreamingOverwrite();
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 085eedf45d..664c201915 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -71,34 +71,31 @@ public class TestFileRewriteCoordinator extends 
CatalogTestBase {
     long avgFileSize = fileSizes.stream().mapToLong(i -> i).sum() / 
fileSizes.size();
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
-      String fileSetID = UUID.randomUUID().toString();
+      String groupId = UUID.randomUUID().toString();
 
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
-      taskSetManager.stageTasks(table, fileSetID, 
Lists.newArrayList(fileScanTasks));
+      taskSetManager.stageTasks(table, groupId, 
Lists.newArrayList(fileScanTasks));
+      SparkTableCache.get().add(groupId, table);
 
       // read and pack original 4 files into 2 splits
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.toString(avgFileSize * 
2))
               .option(SparkReadOptions.FILE_OPEN_COST, "0")
-              .load(tableName);
+              .load(groupId);
 
       // write the packed data into new files where each split becomes a new 
file
-      scanDF
-          .writeTo(tableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(groupId);
 
       // commit the rewrite
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
       Set<DataFile> rewrittenFiles =
-          taskSetManager.fetchTasks(table, fileSetID).stream()
+          taskSetManager.fetchTasks(table, groupId).stream()
               .map(t -> t.asFileScanTask().file())
               .collect(Collectors.toCollection(DataFileSet::create));
-      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, 
fileSetID);
+      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, 
groupId);
       table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
     }
 
@@ -127,20 +124,20 @@ public class TestFileRewriteCoordinator extends 
CatalogTestBase {
     assertThat(table.snapshots()).as("Should produce 4 snapshots").hasSize(4);
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
-      String fileSetID = UUID.randomUUID().toString();
+      String groupId = UUID.randomUUID().toString();
 
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
-      taskSetManager.stageTasks(table, fileSetID, 
Lists.newArrayList(fileScanTasks));
+      taskSetManager.stageTasks(table, groupId, 
Lists.newArrayList(fileScanTasks));
+      SparkTableCache.get().add(groupId, table);
 
       // read original 4 files as 4 splits
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, "134217728")
               .option(SparkReadOptions.FILE_OPEN_COST, "134217728")
-              .load(tableName);
+              .load(groupId);
 
       // make sure we disable AQE and set the number of shuffle partitions as 
the target num files
       ImmutableMap<String, String> sqlConf =
@@ -151,25 +148,17 @@ public class TestFileRewriteCoordinator extends 
CatalogTestBase {
       withSQLConf(
           sqlConf,
           () -> {
-            try {
-              // write new files with sorted records
-              scanDF
-                  .sort("id")
-                  .writeTo(tableName)
-                  .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, 
fileSetID)
-                  .append();
-            } catch (NoSuchTableException e) {
-              throw new RuntimeException("Could not replace files", e);
-            }
+            // write new files with sorted records
+            
scanDF.sort("id").write().format("iceberg").mode("append").save(groupId);
           });
 
       // commit the rewrite
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
       Set<DataFile> rewrittenFiles =
-          taskSetManager.fetchTasks(table, fileSetID).stream()
+          taskSetManager.fetchTasks(table, groupId).stream()
               .map(t -> t.asFileScanTask().file())
               .collect(Collectors.toCollection(DataFileSet::create));
-      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, 
fileSetID);
+      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, 
groupId);
       table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
     }
 
@@ -196,14 +185,15 @@ public class TestFileRewriteCoordinator extends 
CatalogTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    String firstFileSetID = UUID.randomUUID().toString();
+    String firstGroupId = UUID.randomUUID().toString();
     long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
 
     ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
 
     try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
       // stage first 2 files for compaction
-      taskSetManager.stageTasks(table, firstFileSetID, 
Lists.newArrayList(tasks));
+      taskSetManager.stageTasks(table, firstGroupId, 
Lists.newArrayList(tasks));
+      SparkTableCache.get().add(firstGroupId, table);
     }
 
     // add two more files
@@ -212,43 +202,40 @@ public class TestFileRewriteCoordinator extends 
CatalogTestBase {
 
     table.refresh();
 
-    String secondFileSetID = UUID.randomUUID().toString();
+    String secondGroupId = UUID.randomUUID().toString();
 
     try (CloseableIterable<FileScanTask> tasks =
         table.newScan().appendsAfter(firstFileSetSnapshotId).planFiles()) {
       // stage 2 more files for compaction
-      taskSetManager.stageTasks(table, secondFileSetID, 
Lists.newArrayList(tasks));
+      taskSetManager.stageTasks(table, secondGroupId, 
Lists.newArrayList(tasks));
+      SparkTableCache.get().add(secondGroupId, table);
     }
 
-    ImmutableSet<String> fileSetIDs = ImmutableSet.of(firstFileSetID, 
secondFileSetID);
+    ImmutableSet<String> groupIds = ImmutableSet.of(firstGroupId, 
secondGroupId);
 
-    for (String fileSetID : fileSetIDs) {
+    for (String groupId : groupIds) {
       // read and pack 2 files into 1 split
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
-              .load(tableName);
+              .load(groupId);
 
       // write the combined data as one file
-      scanDF
-          .writeTo(tableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(groupId);
     }
 
     // commit both rewrites at the same time
     FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
     Set<DataFile> rewrittenFiles =
-        fileSetIDs.stream()
-            .flatMap(fileSetID -> taskSetManager.fetchTasks(table, 
fileSetID).stream())
+        groupIds.stream()
+            .flatMap(groupId -> taskSetManager.fetchTasks(table, 
groupId).stream())
             .map(t -> t.asFileScanTask().file())
             .collect(Collectors.toSet());
     Set<DataFile> addedFiles =
-        fileSetIDs.stream()
-            .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, 
fileSetID).stream())
+        groupIds.stream()
+            .flatMap(groupId -> rewriteCoordinator.fetchNewFiles(table, 
groupId).stream())
             .collect(Collectors.toCollection(DataFileSet::create));
     table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
deleted file mode 100644
index 228bf43b89..0000000000
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import org.apache.iceberg.ParameterizedTestExtension;
-import org.apache.iceberg.Parameters;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-@ExtendWith(ParameterizedTestExtension.class)
-public class TestSparkCachedTableCatalog extends TestBaseWithCatalog {
-
-  private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
-
-  @BeforeAll
-  public static void setupCachedTableCatalog() {
-    spark.conf().set("spark.sql.catalog.testcache", 
SparkCachedTableCatalog.class.getName());
-  }
-
-  @AfterAll
-  public static void unsetCachedTableCatalog() {
-    spark.conf().unset("spark.sql.catalog.testcache");
-  }
-
-  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
-  protected static Object[][] parameters() {
-    return new Object[][] {
-      {
-        SparkCatalogConfig.HIVE.catalogName(),
-        SparkCatalogConfig.HIVE.implementation(),
-        SparkCatalogConfig.HIVE.properties()
-      },
-    };
-  }
-
-  @TestTemplate
-  public void testTimeTravel() {
-    sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
-
-    table.refresh();
-    Snapshot firstSnapshot = table.currentSnapshot();
-    waitUntilAfter(firstSnapshot.timestampMillis());
-
-    sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName);
-
-    table.refresh();
-    Snapshot secondSnapshot = table.currentSnapshot();
-    waitUntilAfter(secondSnapshot.timestampMillis());
-
-    sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName);
-
-    table.refresh();
-
-    try {
-      TABLE_CACHE.add("key", table);
-
-      assertEquals(
-          "Should have expected rows in 3rd snapshot",
-          ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")),
-          sql("SELECT * FROM testcache.key ORDER BY id"));
-
-      assertEquals(
-          "Should have expected rows in 2nd snapshot",
-          ImmutableList.of(row(1, "hr"), row(2, "hr")),
-          sql(
-              "SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id",
-              secondSnapshot.timestampMillis()));
-
-      assertEquals(
-          "Should have expected rows in 1st snapshot",
-          ImmutableList.of(row(1, "hr")),
-          sql(
-              "SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id",
-              firstSnapshot.snapshotId()));
-
-    } finally {
-      TABLE_CACHE.remove("key");
-    }
-  }
-}
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 7892fd65b4..5641c7b2a0 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -71,7 +71,7 @@ import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkStructLike;
-import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
@@ -842,25 +842,21 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
     for (String partValue : ImmutableList.of("a", "b")) {
       try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
partValue)) {
         String fileSetID = UUID.randomUUID().toString();
+        SparkTableCache.get().add(fileSetID, posDeletesTable);
         stageTask(tab, fileSetID, tasks);
 
         Dataset<Row> scanDF =
             spark
                 .read()
                 .format("iceberg")
-                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
                 .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-                .load(posDeletesTableName);
+                .load(fileSetID);
 
         assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-        scanDF
-            .writeTo(posDeletesTableName)
-            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, 
fileSetID)
-            .append();
+        scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
         commit(tab, posDeletesTable, fileSetID, 1);
       }
@@ -911,23 +907,19 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
     try (CloseableIterable<ScanTask> tasks = 
posDeletesTable.newBatchScan().planFiles()) {
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, posDeletesTable);
       stageTask(tab, fileSetID, tasks);
 
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-              .load(posDeletesTableName);
+              .load(fileSetID);
       assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-      scanDF
-          .writeTo(posDeletesTableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
       commit(tab, posDeletesTable, fileSetID, 1);
     }
@@ -986,23 +978,15 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     // rewrite delete files
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
     for (String partValue : ImmutableList.of("a", "b")) {
       try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
partValue)) {
         String fileSetID = UUID.randomUUID().toString();
+        SparkTableCache.get().add(fileSetID, posDeletesTable);
         stageTask(tab, fileSetID, tasks);
 
-        Dataset<Row> scanDF =
-            spark
-                .read()
-                .format("iceberg")
-                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
-                .load(posDeletesTableName);
+        Dataset<Row> scanDF = spark.read().format("iceberg").load(fileSetID);
         assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-        scanDF
-            .writeTo(posDeletesTableName)
-            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, 
fileSetID)
-            .append();
+        scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
         commit(tab, posDeletesTable, fileSetID, 1);
       }
@@ -1067,26 +1051,22 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
 
     // Read/write back unpartitioned data
     try (CloseableIterable<ScanTask> tasks =
         
posDeletesTable.newBatchScan().filter(Expressions.isNull("partition.data")).planFiles())
 {
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, posDeletesTable);
       stageTask(tab, fileSetID, tasks);
 
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-              .load(posDeletesTableName);
+              .load(fileSetID);
       assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-      scanDF
-          .writeTo(posDeletesTableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
       commit(tab, posDeletesTable, fileSetID, 1);
     }
@@ -1117,20 +1097,17 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     for (String partValue : ImmutableList.of("a", "b")) {
       try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
partValue)) {
         String fileSetID = UUID.randomUUID().toString();
+        SparkTableCache.get().add(fileSetID, posDeletesTable);
         stageTask(tab, fileSetID, tasks);
 
         Dataset<Row> scanDF =
             spark
                 .read()
                 .format("iceberg")
-                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
                 .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-                .load(posDeletesTableName);
+                .load(fileSetID);
         assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-        scanDF
-            .writeTo(posDeletesTableName)
-            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, 
fileSetID)
-            .append();
+        scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
         // commit the rewrite
         commit(tab, posDeletesTable, fileSetID, 1);
@@ -1181,33 +1158,29 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Dataset<Row> scanDF;
     String fileSetID = UUID.randomUUID().toString();
     try (CloseableIterable<ScanTask> tasks = 
posDeletesTable.newBatchScan().planFiles()) {
+      SparkTableCache.get().add(fileSetID, posDeletesTable);
       stageTask(tab, fileSetID, tasks);
 
       scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-              .load(posDeletesTableName);
+              .load(fileSetID);
       assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
 
       // Add partition field to render the original un-partitioned dataset 
un-commitable
       tab.updateSpec().addField("data").commit();
     }
 
-    scanDF
-        .writeTo(posDeletesTableName)
-        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-        .append();
+    scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
     scanDF =
         spark
             .read()
             .format("iceberg")
-            .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
             .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-            .load(posDeletesTableName);
+            .load(fileSetID);
     assertThat(Arrays.asList(scanDF.columns()).contains("partition"));
 
     dropTable(tableName);
@@ -1247,26 +1220,22 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
 
     // rewrite files of old schema
     try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
"a")) {
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, posDeletesTable);
       stageTask(tab, fileSetID, tasks);
 
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-              .load(posDeletesTableName);
+              .load(fileSetID);
 
       assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-      scanDF
-          .writeTo(posDeletesTableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
       commit(tab, posDeletesTable, fileSetID, 1);
     }
@@ -1300,21 +1269,18 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     // rewrite files of new schema
     try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
"c")) {
       String fileSetID = UUID.randomUUID().toString();
+      SparkTableCache.get().add(fileSetID, posDeletesTable);
       stageTask(tab, fileSetID, tasks);
 
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-              .load(posDeletesTableName);
+              .load(fileSetID);
 
       assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-      scanDF
-          .writeTo(posDeletesTableName)
-          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
-          .append();
+      scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
       commit(tab, posDeletesTable, fileSetID, 1);
     }
@@ -1371,26 +1337,22 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, 
MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = catalogName + ".default." + tableName + 
".position_deletes";
 
     // rewrite files
     for (String partValue : ImmutableList.of("a", "b", "c", "d")) {
       try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", 
partValue)) {
         String fileSetID = UUID.randomUUID().toString();
+        SparkTableCache.get().add(fileSetID, posDeletesTable);
         stageTask(tab, fileSetID, tasks);
 
         Dataset<Row> scanDF =
             spark
                 .read()
                 .format("iceberg")
-                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
                 .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
-                .load(posDeletesTableName);
+                .load(fileSetID);
         assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1);
-        scanDF
-            .writeTo(posDeletesTableName)
-            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, 
fileSetID)
-            .append();
+        scanDF.write().format("iceberg").mode("append").save(fileSetID);
 
         commit(tab, posDeletesTable, fileSetID, 1);
       }
@@ -1453,8 +1415,8 @@ public class TestPositionDeletesTable extends 
CatalogTestBase {
     Dataset<Row> scanDF = 
spark.read().format("iceberg").load(posDeletesTableName);
 
     assertThatThrownBy(() -> scanDF.writeTo(posDeletesTableName).append())
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Can only write to " + posDeletesTableName + " via 
actions");
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot append to a metadata table");
 
     dropTable(tableName);
   }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index e444b7cb1f..de07b7471f 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -31,6 +31,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -60,16 +61,12 @@ public class TestSparkStagedScan extends CatalogTestBase {
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
-      String setID = UUID.randomUUID().toString();
-      taskSetManager.stageTasks(table, setID, 
ImmutableList.copyOf(fileScanTasks));
+      String groupId = UUID.randomUUID().toString();
+      taskSetManager.stageTasks(table, groupId, 
ImmutableList.copyOf(fileScanTasks));
+      SparkTableCache.get().add(groupId, table);
 
-      // load the staged file set
-      Dataset<Row> scanDF =
-          spark
-              .read()
-              .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
-              .load(tableName);
+      // load the staged file set via the rewrite catalog
+      Dataset<Row> scanDF = spark.read().format("iceberg").load(groupId);
 
       // write the records back essentially duplicating data
       scanDF.writeTo(tableName).append();
@@ -96,18 +93,18 @@ public class TestSparkStagedScan extends CatalogTestBase {
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
-      String setID = UUID.randomUUID().toString();
+      String groupId = UUID.randomUUID().toString();
       List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
-      taskSetManager.stageTasks(table, setID, tasks);
+      taskSetManager.stageTasks(table, groupId, tasks);
+      SparkTableCache.get().add(groupId, table);
 
       // load the staged file set and make sure each file is in a separate 
split
       Dataset<Row> scanDF =
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
               .option(SparkReadOptions.SPLIT_SIZE, 
tasks.get(0).file().fileSizeInBytes())
-              .load(tableName);
+              .load(groupId);
       assertThat(scanDF.javaRDD().getNumPartitions())
           .as("Num partitions should match")
           .isEqualTo(2);
@@ -117,9 +114,8 @@ public class TestSparkStagedScan extends CatalogTestBase {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
-              .load(tableName);
+              .load(groupId);
       assertThat(scanDF.javaRDD().getNumPartitions())
           .as("Num partitions should match")
           .isEqualTo(1);


Reply via email to