This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new d20f13393 [AMORO-2819]Spark cannot execute the "alter table set 
identifier field" command on tables in Iceberg format in unified catalog (#2894)
d20f13393 is described below

commit d20f13393c1041b4d6fd95a262cf5cee93fd1a9f
Author: Wang Tao <[email protected]>
AuthorDate: Mon Jun 24 11:36:06 2024 +0800

    [AMORO-2819]Spark cannot execute the "alter table set identifier field" 
command on tables in Iceberg format in unified catalog (#2894)
    
    * fix some error when access iceberg table through unified catalog
    
    * fix error for alter table set identifier field
    
    * add some ut
    
    * fix checkstyle
    
    * fix review suggestion
    
    * fix review suggestion
    
    * fix review suggestion
    
    * fix review suggestion
---
 ...edCatalog.java => SparkUnifiedCatalogBase.java} |   6 +-
 ...og.java => SparkUnifiedSessionCatalogBase.java} |  11 +-
 .../amoro/spark/iceberg/IcebergSparkFormat.java    |   6 +-
 .../apache/amoro/spark/test/SparkTestContext.java  |   3 +-
 .../test/unified/UnifiedCatalogTestSuites.java     | 135 ++++++++++++-
 .../apache/amoro/spark/SparkUnifiedCatalog.java    |  68 +++++++
 .../amoro/spark/SparkUnifiedSessionCatalog.java    |  36 ++++
 .../amoro/spark/MixedFormatSparkExtensions.scala   |   4 +-
 .../MixedFormatExtendedDataSourceV2Strategy.scala  | 218 +++++++++++++++++++++
 .../amoro-mixed-format-spark-runtime-3.2/pom.xml   |  66 +++++--
 .../apache/amoro/spark/SparkUnifiedCatalog.java    | 132 +++++++++++++
 .../amoro/spark/SparkUnifiedSessionCatalog.java    | 129 ++++++++++++
 .../amoro/spark/MixedFormatSparkExtensions.scala   |   5 +-
 .../MixedFormatExtendedDataSourceV2Strategy.scala  | 216 ++++++++++++++++++++
 .../amoro-mixed-format-spark-runtime-3.3/pom.xml   | 111 +++++------
 15 files changed, 1051 insertions(+), 95 deletions(-)

diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
similarity index 98%
rename from 
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
rename to 
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
index f5564bce9..e4662d9e9 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
@@ -57,9 +57,9 @@ import java.util.Map;
 import java.util.ServiceLoader;
 
 /** Unified catalog implement for spark engine. */
-public class SparkUnifiedCatalog implements TableCatalog, SupportsNamespaces, 
ProcedureCatalog {
+public class SparkUnifiedCatalogBase implements TableCatalog, 
SupportsNamespaces, ProcedureCatalog {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkUnifiedCatalog.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkUnifiedCatalogBase.class);
   private static final Map<TableFormat, String> defaultTableCatalogImplMap =
       ImmutableMap.of(
           TableFormat.ICEBERG, "org.apache.iceberg.spark.SparkCatalog",
@@ -272,7 +272,7 @@ public class SparkUnifiedCatalog implements TableCatalog, 
SupportsNamespaces, Pr
     return procedureCatalog.loadProcedure(ident);
   }
 
-  private TableCatalog tableCatalog(TableFormat format) {
+  protected TableCatalog tableCatalog(TableFormat format) {
     return tableCatalogs.computeIfAbsent(format, this::initializeTableCatalog);
   }
 
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
similarity index 85%
rename from 
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
rename to 
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
index cf9a7b557..a42db5c42 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
@@ -32,14 +32,17 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 
-public class SparkUnifiedSessionCatalog<T extends TableCatalog & 
SupportsNamespaces>
+public abstract class SparkUnifiedSessionCatalogBase<T extends TableCatalog & 
SupportsNamespaces>
     extends SessionCatalogBase<T> implements ProcedureCatalog {
 
-  private final Map<TableFormat, SparkTableFormat> tableFormats = 
Maps.newConcurrentMap();
+  protected final Map<TableFormat, SparkTableFormat> tableFormats = 
Maps.newConcurrentMap();
+
+  protected abstract SparkUnifiedCatalogBase createUnifiedCatalog(
+      String name, CaseInsensitiveStringMap options);
 
   @Override
   protected TableCatalog buildTargetCatalog(String name, 
CaseInsensitiveStringMap options) {
-    SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+    SparkUnifiedCatalogBase sparkUnifiedCatalog = createUnifiedCatalog(name, 
options);
     sparkUnifiedCatalog.initialize(name, options);
     ServiceLoader<SparkTableFormat> sparkTableFormats = 
ServiceLoader.load(SparkTableFormat.class);
     for (SparkTableFormat format : sparkTableFormats) {
@@ -80,7 +83,7 @@ public class SparkUnifiedSessionCatalog<T extends 
TableCatalog & SupportsNamespa
 
   @Override
   public Procedure loadProcedure(Identifier ident) throws 
NoSuchProcedureException {
-    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+    SparkUnifiedCatalogBase catalog = (SparkUnifiedCatalogBase) 
getTargetCatalog();
     return catalog.loadProcedure(ident);
   }
 }
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
index f3513f908..d733f9124 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
@@ -31,6 +31,8 @@ import java.util.regex.Pattern;
 public class IcebergSparkFormat implements SparkTableFormat {
   private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
   private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
+  private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
+  private static final Pattern TAG = Pattern.compile("tag_(.*)");
 
   @Override
   public TableFormat format() {
@@ -41,7 +43,9 @@ public class IcebergSparkFormat implements SparkTableFormat {
   public boolean isSubTableName(String tableName) {
     return MetadataTableType.from(tableName) != null
         || AT_TIMESTAMP.matcher(tableName).matches()
-        || SNAPSHOT_ID.matcher(tableName).matches();
+        || SNAPSHOT_ID.matcher(tableName).matches()
+        || TAG.matcher(tableName).matches()
+        || BRANCH.matcher(tableName).matches();
   }
 
   @Override
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 862f7caf8..1c06a9c8f 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -27,7 +27,6 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
 import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.spark.SparkUnifiedCatalog;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -50,7 +49,7 @@ public class SparkTestContext {
       "org.apache.amoro.spark.MixedFormatSparkExtensions"
           + 
",org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
 
-  public static final String UNIFIED_CATALOG_IMP = 
SparkUnifiedCatalog.class.getName();
+  public static final String UNIFIED_CATALOG_IMP = 
"org.apache.amoro.spark.SparkUnifiedCatalog";
 
   final TemporaryFolder warehouse = new TemporaryFolder();
   public static final String AMS_ALL_FORMAT_CATALOG_NAME = "all_formats";
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
index 8ed2b0088..b35684297 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
@@ -25,8 +25,10 @@ import org.apache.amoro.UnifiedCatalogLoader;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.spark.SparkUnifiedSessionCatalog;
 import org.apache.amoro.spark.test.SparkTestBase;
+import org.apache.amoro.spark.test.TestIdentifier;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
@@ -34,6 +36,7 @@ import org.junit.jupiter.params.provider.Arguments;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class UnifiedCatalogTestSuites extends SparkTestBase {
 
@@ -41,7 +44,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
   protected Map<String, String> sparkSessionConfig() {
     return ImmutableMap.of(
         "spark.sql.catalog.spark_catalog",
-        SparkUnifiedSessionCatalog.class.getName(),
+        "org.apache.amoro.spark.SparkUnifiedSessionCatalog",
         "spark.sql.catalog.spark_catalog.uri",
         CONTEXT.amsCatalogUrl(null));
   }
@@ -64,7 +67,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
         "CREATE TABLE "
             + target()
             + " ( "
-            + "id int, "
+            + "id int not null, "
             + "data string, "
             + "pt string"
             + pkDDL(format)
@@ -94,6 +97,13 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
     // call procedure
     testCallProcedure(format);
 
+    // alter table test
+    testIcebergAlterTable(format, target(), "id");
+    testIcebergUpdate(format, target());
+    testIcebergDeleteInSubQuery(format, target());
+    testIcebergDropPartitionField(format, target(), "pt");
+    testIcebergMetadata(format, target());
+
     sql("DROP TABLE " + target() + " PURGE");
     Assertions.assertFalse(unifiedCatalog().tableExists(target().database, 
target().table));
   }
@@ -177,4 +187,123 @@ public class UnifiedCatalogTestSuites extends 
SparkTestBase {
       unifiedCatalog.createDatabase(database());
     }
   }
+
+  private void testIcebergAlterTable(
+      TableFormat format, TestIdentifier targetTable, String fieldName) {
+    if (TableFormat.ICEBERG != format) {
+      // only tests for iceberg
+      return;
+    }
+    // set identifier fields
+    String sqlText =
+        String.format("alter table %s set identifier fields  %s", targetTable, 
fieldName);
+    Dataset<Row> rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    AmoroTable<?> icebergTable =
+        unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+    Table table = (Table) icebergTable.originalTable();
+    
Assertions.assertTrue(table.schema().identifierFieldNames().contains(fieldName));
+
+    // drop identifier fields/
+    sqlText = String.format("alter table %s DROP IDENTIFIER FIELDS  %s", 
targetTable, fieldName);
+    rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    icebergTable = unifiedCatalog().loadTable(targetTable.database, 
targetTable.table);
+    table = (Table) icebergTable.originalTable();
+    
Assertions.assertFalse(table.schema().identifierFieldNames().contains(fieldName));
+  }
+
+  private void testIcebergUpdate(TableFormat format, TestIdentifier 
targetTable) {
+    if (TableFormat.ICEBERG != format) {
+      // only tests for iceberg
+      return;
+    }
+    String afterValue = "update-xxx";
+    // set identifier fields
+    String sqlText = String.format("update %s set data='%s' where id=1", 
targetTable, afterValue);
+    Dataset<Row> rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    // drop identifier fields/
+    sqlText = String.format("select * from  %s where id=1", targetTable);
+    rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 3 && 
rs.head().getString(1).equals(afterValue));
+  }
+
+  private void testIcebergDeleteInSubQuery(TableFormat format, TestIdentifier 
targetTable) {
+    if (TableFormat.ICEBERG != format) {
+      // only tests for iceberg
+      return;
+    }
+    // set identifier fields
+    String sqlText = String.format("select min(id) from %s", targetTable);
+    Dataset<Row> rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 1);
+    Integer minId = rs.head().getInt(0);
+
+    // set identifier fields
+    sqlText =
+        String.format(
+            "delete from %s where id=(select min(id) from %s);", targetTable, 
targetTable);
+    rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    // drop identifier fields/
+    sqlText = String.format("select * from  %s where id=%d", targetTable, 
minId);
+    rs = sql(sqlText);
+    Assertions.assertTrue(rs.count() == 0);
+  }
+
+  private void testIcebergDropPartitionField(
+      TableFormat format, TestIdentifier targetTable, String fieldName) {
+    if (TableFormat.ICEBERG != format) {
+      // only tests for iceberg
+      return;
+    }
+    // drop partition field
+    String sqlText =
+        String.format("alter table %s drop partition field  %s", targetTable, 
fieldName);
+    Dataset<Row> rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    AmoroTable<?> icebergTable =
+        unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+    Table table = (Table) icebergTable.originalTable();
+    Assertions.assertTrue(
+        Optional.empty()
+            .equals(
+                table.spec().fields().stream()
+                    .filter(item -> item.name().equals(fieldName))
+                    .findAny()));
+  }
+
+  private void testIcebergMetadata(TableFormat format, TestIdentifier 
targetTable) {
+    if (TableFormat.ICEBERG != format) {
+      // only tests for iceberg
+      return;
+    }
+    // drop partition field
+    String tagKey = "tag-unittest";
+    String sqlText =
+        String.format("alter table %s create tag if not exists `%s`", 
targetTable, tagKey);
+    Dataset<Row> rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    AmoroTable<?> icebergTable =
+        unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+    Table table = (Table) icebergTable.originalTable();
+    Map<String, SnapshotRef> refs = table.refs();
+    Assertions.assertTrue(refs != null && refs.containsKey(tagKey));
+
+    sqlText = String.format("alter table %s drop tag `%s`", targetTable, 
tagKey);
+    rs = sql(sqlText);
+    Assertions.assertTrue(rs.columns().length == 0);
+
+    icebergTable = unifiedCatalog().loadTable(targetTable.database, 
targetTable.table);
+    table = (Table) icebergTable.originalTable();
+    refs = table.refs();
+    Assertions.assertTrue(refs != null && !refs.containsKey(tagKey));
+  }
 }
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
new file mode 100644
index 000000000..6a9e840bd
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+    implements TableCatalog, SupportsNamespaces, ProcedureCatalog, 
FunctionCatalog {
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for functions
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+    }
+    throw new NoSuchNamespaceException(namespace);
+  }
+
+  /**
+   * Load a function by {@link Identifier identifier} from the catalog.
+   *
+   * @param ident a function identifier
+   * @return an unbound function instance
+   * @throws NoSuchFunctionException If the function doesn't exist
+   */
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+    }
+    throw new NoSuchFunctionException(ident);
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
new file mode 100644
index 000000000..2fab4b9bf
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */
+public class SparkUnifiedSessionCatalog<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+    extends SparkUnifiedSessionCatalogBase<T> {
+
+  @Override
+  protected SparkUnifiedCatalogBase createUnifiedCatalog(
+      String name, CaseInsensitiveStringMap options) {
+    return new SparkUnifiedCatalog();
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
index eb07d0899..f56e88e69 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSessionExtensions
 import 
org.apache.spark.sql.catalyst.analysis.{AlignedRowLevelIcebergCommandCheck, 
AlignRowLevelCommandAssignments, CheckMergeIntoTableConditions, 
MergeIntoIcebergTableResolutionCheck, ProcedureArgumentCoercion, 
ResolveMergeIntoTableReferences, ResolveProcedures, RewriteDeleteFromTable, 
RewriteMergeIntoTable, RewriteUpdateTable}
 import org.apache.spark.sql.catalyst.optimizer._
 import 
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
-import 
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy, 
ExtendedV2Writes, OptimizeMetadataOnlyDeleteFromTable, 
ReplaceRewrittenRowLevelCommand, RowLevelCommandScanRelationPushDown}
+import 
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy, 
ExtendedV2Writes, MixedFormatExtendedDataSourceV2Strategy, 
OptimizeMetadataOnlyDeleteFromTable, ReplaceRewrittenRowLevelCommand, 
RowLevelCommandScanRelationPushDown}
 import 
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning
 
 import org.apache.amoro.spark.sql.catalyst.analysis._
@@ -71,7 +71,7 @@ class MixedFormatSparkExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectPreCBORule { _ => ReplaceRewrittenRowLevelCommand }
 
     // planner extensions
-    extensions.injectPlannerStrategy { spark => 
ExtendedDataSourceV2Strategy(spark) }
+    extensions.injectPlannerStrategy { spark => 
MixedFormatExtendedDataSourceV2Strategy(spark) }
     // mixed-format optimizer rules
     extensions.injectPreCBORule(OptimizeWriteRule)
 
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
new file mode 100644
index 000000000..d4f52f226
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.SparkCatalog
+import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
+import org.apache.spark.sql.catalyst.plans.logical.DropBranch
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
+import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
+import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.WriteDelta
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import org.apache.amoro.spark.{SparkUnifiedCatalog, SparkUnifiedSessionCatalog}
+
+/**
+ * refer apache iceberg project
+ * spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/
+ * datasources/v2/ExtendedDataSourceV2Strategy.scala
+ *
+ * @param spark
+ */
+case class MixedFormatExtendedDataSourceV2Strategy(spark: SparkSession) 
extends Strategy
+  with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case c @ Call(procedure, args) =>
+      val input = buildInternalRow(args)
+      CallExec(c.output, procedure, input) :: Nil
+
+    case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), 
transform, name) =>
+      AddPartitionFieldExec(catalog, ident, transform, name) :: Nil
+
+    case CreateOrReplaceBranch(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          branch,
+          branchOptions,
+          create,
+          replace,
+          ifNotExists) =>
+      CreateOrReplaceBranchExec(
+        catalog,
+        ident,
+        branch,
+        branchOptions,
+        create,
+        replace,
+        ifNotExists) :: Nil
+
+    case CreateOrReplaceTag(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          tag,
+          tagOptions,
+          create,
+          replace,
+          ifNotExists) =>
+      CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, 
ifNotExists) :: Nil
+
+    case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, 
ifExists) =>
+      DropBranchExec(catalog, ident, branch, ifExists) :: Nil
+
+    case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
+      DropTagExec(catalog, ident, tag, ifExists) :: Nil
+
+    case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), 
transform) =>
+      DropPartitionFieldExec(catalog, ident, transform) :: Nil
+
+    case ReplacePartitionField(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          transformFrom,
+          transformTo,
+          name) =>
+      ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, 
name) :: Nil
+
+    case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+    case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+    case SetWriteDistributionAndOrdering(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          distributionMode,
+          ordering) =>
+      SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, 
ordering) :: Nil
+
+    case ReplaceData(_: DataSourceV2Relation, query, r: DataSourceV2Relation, 
Some(write)) =>
+      // refresh the cache using the original relation
+      ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
+
+    case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation, 
projs, Some(write)) =>
+      // refresh the cache using the original relation
+      WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil
+
+    case MergeRows(
+          isSourceRowPresent,
+          isTargetRowPresent,
+          matchedConditions,
+          matchedOutputs,
+          notMatchedConditions,
+          notMatchedOutputs,
+          targetOutput,
+          rowIdAttrs,
+          performCardinalityCheck,
+          emitNotMatchedTargetRows,
+          output,
+          child) =>
+      MergeRowsExec(
+        isSourceRowPresent,
+        isTargetRowPresent,
+        matchedConditions,
+        matchedOutputs,
+        notMatchedConditions,
+        notMatchedOutputs,
+        targetOutput,
+        rowIdAttrs,
+        performCardinalityCheck,
+        emitNotMatchedTargetRows,
+        output,
+        planLater(child)) :: Nil
+
+    case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output), 
condition, None) =>
+      // the optimizer has already checked that this delete can be handled 
using a metadata operation
+      val deleteCond = condition.getOrElse(Literal.TrueLiteral)
+      val predicates = splitConjunctivePredicates(deleteCond)
+      val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, 
output)
+      val filters = normalizedPredicates.flatMap { pred =>
+        val filter = DataSourceStrategy.translateFilter(pred, 
supportNestedPredicatePushdown = true)
+        if (filter.isEmpty) {
+          throw 
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
+        }
+        filter
+      }.toArray
+      DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil
+
+    case NoStatsUnaryNode(child) =>
+      planLater(child) :: Nil
+
+    case _ => Nil
+  }
+
+  private def buildInternalRow(exprs: Seq[Expression]): InternalRow = {
+    val values = new Array[Any](exprs.size)
+    for (index <- exprs.indices) {
+      values(index) = exprs(index).eval()
+    }
+    new GenericInternalRow(values)
+  }
+
+  private def refreshCache(r: DataSourceV2Relation)(): Unit = {
+    spark.sharedState.cacheManager.recacheByPlan(spark, r)
+  }
+
+  /**
+   * support UnifiedSessionCatalog
+   */
+  private object IcebergCatalogAndIdentifier {
+    def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] = 
{
+      val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, 
identifier.asJava)
+      catalogAndIdentifier.catalog match {
+        case icebergCatalog: SparkCatalog =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkSessionCatalog[_] =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkUnifiedCatalog =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkUnifiedSessionCatalog[_] =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case _ =>
+          None
+      }
+    }
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
index 484be250e..b1dbaf402 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
@@ -263,6 +263,9 @@
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.MixedFormatExtendedDataSourceV2Strategy*
+                                        </include>
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.MergeIntoExec*
                                         </include>
@@ -278,6 +281,18 @@
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrderingExec*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.RowLevelCommandScanRelationPushDown*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.ExtendedV2Writes*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.ReplaceRewrittenRowLevelCommand*
+                                        </include>
                                     </includes>
                                 </relocation>
                                 <relocation>
@@ -312,6 +327,30 @@
                                         <include>
                                             
org.apache.spark.sql.catalyst.analysis.RowLevelOperationsPredicateCheck*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.ResolveMergeIntoTableReferences*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.CheckMergeIntoTableConditions*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.AlignRowLevelCommandAssignments*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.RewriteDeleteFromTable*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.MergeIntoIcebergTableResolutionCheck*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.analysis.AlignedRowLevelIcebergCommandCheck*
+                                        </include>
                                     </includes>
                                 </relocation>
                                 <relocation>
@@ -335,19 +374,10 @@
                                     </shadedPattern>
                                     <includes>
                                         <include>
-                                            
org.apache.spark.sql.catalyst.optimizer.OptimizeConditionsInRowLevelOperations*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicatesInRowLevelOperations*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.optimizer.RewriteDelete*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.optimizer.RewriteMergeInto*
+                                            
org.apache.spark.sql.catalyst.optimizer.ExtendedSimplifyConditionalsInPredicate*
                                         </include>
                                         <include>
-                                            
org.apache.spark.sql.catalyst.optimizer.RewriteUpdate*
+                                            
org.apache.spark.sql.catalyst.optimizer.ExtendedReplaceNullWithFalseInPredicate*
                                         </include>
                                     </includes>
                                 </relocation>
@@ -364,6 +394,9 @@
                                         <include>
                                             
org.apache.spark.sql.catalyst.parser.extensions.UpperCaseCharStream*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser*
+                                        </include>
                                     </includes>
                                 </relocation>
                                 <relocation>
@@ -439,6 +472,17 @@
                                     
<shadedPattern>org.apache.amoro.shade.org.apache.hc
                                     </shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.spark.sql.execution.dynamicpruning</pattern>
+                                    <shadedPattern>
+                                        
org.apache.amoro.shade.org.apache.spark.sql.execution.dynamicpruning
+                                    </shadedPattern>
+                                    <includes>
+                                        <include>
+                                            
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning*
+                                        </include>
+                                    </includes>
+                                </relocation>
                             </relocations>
                             
<finalName>${project.artifactId}-${project.version}</finalName>
                         </configuration>
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
new file mode 100644
index 000000000..44c813300
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+    implements TableCatalog, SupportsNamespaces, ProcedureCatalog, 
FunctionCatalog {
+
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for functions
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+    }
+    throw new NoSuchNamespaceException(namespace);
+  }
+
+  /**
+   * Load a function by {@link Identifier identifier} from the catalog.
+   *
+   * @param ident a function identifier
+   * @return an unbound function instance
+   * @throws NoSuchFunctionException If the function doesn't exist
+   */
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+    }
+    throw new NoSuchFunctionException(ident);
+  }
+
+  /**
+   * Drop a namespace from the catalog with cascade mode, recursively dropping 
all objects within
+   * the namespace if cascade is true.
+   *
+   * <p>If the catalog implementation does not support this operation, it may 
throw {@link
+   * UnsupportedOperationException}.
+   *
+   * @param namespace a multi-part namespace
+   * @param cascade When true, deletes all objects under the namespace
+   * @return true if the namespace was dropped
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   * @throws NonEmptyNamespaceException If the namespace is non-empty and 
cascade is false
+   * @throws UnsupportedOperationException If drop is not a supported operation
+   */
+  @Override
+  public boolean dropNamespace(String[] namespace, boolean cascade)
+      throws NoSuchNamespaceException, NonEmptyNamespaceException {
+    return false;
+  }
+
+  /**
+   * Load table metadata of a specific version by {@link Identifier 
identifier} from the catalog.
+   *
+   * <p>If the catalog supports views and contains a view for the identifier 
and not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @param version version of the table
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   */
+  @Override
+  public Table loadTable(Identifier ident, String version) throws 
NoSuchTableException {
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog == null) {
+      throw new UnsupportedOperationException("Doesn't support iceberg table 
catalog");
+    }
+    return tableCatalog.loadTable(ident, version);
+  }
+
+  /**
+   * Load table metadata at a specific time by {@link Identifier identifier} 
from the catalog.
+   *
+   * <p>If the catalog supports views and contains a view for the identifier 
and not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @param timestamp timestamp of the table, which is microseconds since 
1970-01-01 00:00:00 UTC
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   */
+  @Override
+  public Table loadTable(Identifier ident, long timestamp) throws 
NoSuchTableException {
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog == null) {
+      throw new UnsupportedOperationException("Only support iceberg format 
now!");
+    }
+    return tableCatalog.loadTable(ident, timestamp);
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
new file mode 100644
index 000000000..96213a63f
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * For TableCatalog in spark 3.3 is different with spark 3.2。 so we define it 
seperately 1、 we
+ * support the grammar feature of time travel. 2、 support FunctionCatalog
+ */
+public class SparkUnifiedSessionCatalog<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+    extends SparkUnifiedSessionCatalogBase<T> {
+
+  @Override
+  protected SparkUnifiedCatalogBase createUnifiedCatalog(
+      String name, CaseInsensitiveStringMap options) {
+    return new SparkUnifiedCatalog();
+  }
+
+  @Override
+  public Table loadTable(Identifier ident, String version) throws 
NoSuchTableException {
+    try {
+      TableCatalog catalog = getTargetCatalog();
+      SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) 
catalog;
+      return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, 
version);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      return getSessionCatalog().loadTable(ident, version);
+    }
+  }
+
+  @Override
+  public Table loadTable(Identifier ident, long timestamp) throws 
NoSuchTableException {
+    try {
+      TableCatalog catalog = getTargetCatalog();
+      SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) 
catalog;
+      return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, 
timestamp);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      return getSessionCatalog().loadTable(ident, timestamp);
+    }
+  }
+
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for functions
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+    return catalog.listFunctions(namespace);
+  }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    String[] namespace = ident.namespace();
+    String name = ident.name();
+
+    // Allow for empty namespace, as Spark's storage partitioned joins look up
+    // the corresponding functions to generate transforms for partitioning
+    // with an empty namespace, such as `bucket`.
+    // Otherwise, use `system` namespace.
+    if (namespace.length == 0 || isSystemNamespace(namespace)) {
+      UnboundFunction func = SparkFunctions.load(name);
+      if (func != null) {
+        return func;
+      }
+    }
+
+    throw new NoSuchFunctionException(ident);
+  }
+
+  private static boolean isSystemNamespace(String[] namespace) {
+    return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+  }
+
+  /**
+   * Drop a namespace from the catalog with cascade mode, recursively dropping 
all objects within
+   * the namespace if cascade is true.
+   *
+   * <p>If the catalog implementation does not support this operation, it may 
throw {@link
+   * UnsupportedOperationException}.
+   *
+   * @param namespace a multi-part namespace
+   * @param cascade When true, deletes all objects under the namespace
+   * @return true if the namespace was dropped
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   * @throws NonEmptyNamespaceException If the namespace is non-empty and 
cascade is false
+   * @throws UnsupportedOperationException If drop is not a supported operation
+   */
+  @Override
+  public boolean dropNamespace(String[] namespace, boolean cascade)
+      throws NoSuchNamespaceException, NonEmptyNamespaceException {
+    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+    return catalog.dropNamespace(namespace, cascade);
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
index f105d1874..8164587d6 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
@@ -22,7 +22,8 @@ import org.apache.spark.sql.SparkSessionExtensions
 import 
org.apache.spark.sql.catalyst.analysis.{AlignedRowLevelIcebergCommandCheck, 
AlignRowLevelCommandAssignments, CheckMergeIntoTableConditions, 
MergeIntoIcebergTableResolutionCheck, ProcedureArgumentCoercion, 
ResolveMergeIntoTableReferences, ResolveProcedures, 
RewriteDeleteFromIcebergTable, RewriteMergeIntoTable, RewriteUpdateTable}
 import org.apache.spark.sql.catalyst.optimizer._
 import 
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
-import 
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy, 
ExtendedV2Writes, OptimizeMetadataOnlyDeleteFromIcebergTable, 
ReplaceRewrittenRowLevelCommand, RowLevelCommandScanRelationPushDown}
+import org.apache.spark.sql.execution.datasources.v2.{ExtendedV2Writes, 
OptimizeMetadataOnlyDeleteFromIcebergTable, ReplaceRewrittenRowLevelCommand, 
RowLevelCommandScanRelationPushDown}
+import 
org.apache.spark.sql.execution.datasources.v2.{MixedFormatExtendedDataSourceV2Strategy,
 ReplaceRewrittenRowLevelCommand}
 import 
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning
 
 import org.apache.amoro.spark.sql.catalyst.analysis
@@ -77,7 +78,7 @@ class MixedFormatSparkExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectPreCBORule { _ => ReplaceRewrittenRowLevelCommand }
 
     // planner extensions
-    extensions.injectPlannerStrategy { spark => 
ExtendedDataSourceV2Strategy(spark) }
+    extensions.injectPlannerStrategy { spark => 
MixedFormatExtendedDataSourceV2Strategy(spark) }
     // mixed-format optimizer rules
     extensions.injectPreCBORule(OptimizeWriteRule)
 
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
new file mode 100644
index 000000000..58763db67
--- /dev/null
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.SparkCatalog
+import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
+import org.apache.spark.sql.catalyst.plans.logical.DropBranch
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
+import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
+import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.WriteDelta
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import org.apache.amoro.UnifiedCatalog
+import org.apache.amoro.spark.{SparkUnifiedCatalog, SparkUnifiedSessionCatalog}
+
+/**
+ * refer apache iceberg project
+ * spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/
+ * datasources/v2/ExtendedDataSourceV2Strategy.scala
+ * @param spark
+ */
+case class MixedFormatExtendedDataSourceV2Strategy(spark: SparkSession) 
extends Strategy
+  with PredicateHelper {
+
+  import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case c @ Call(procedure, args) =>
+      val input = buildInternalRow(args)
+      CallExec(c.output, procedure, input) :: Nil
+
+    case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), 
transform, name) =>
+      AddPartitionFieldExec(catalog, ident, transform, name) :: Nil
+
+    case CreateOrReplaceBranch(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          branch,
+          branchOptions,
+          create,
+          replace,
+          ifNotExists) =>
+      CreateOrReplaceBranchExec(
+        catalog,
+        ident,
+        branch,
+        branchOptions,
+        create,
+        replace,
+        ifNotExists) :: Nil
+
+    case CreateOrReplaceTag(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          tag,
+          tagOptions,
+          create,
+          replace,
+          ifNotExists) =>
+      CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, 
ifNotExists) :: Nil
+
+    case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, 
ifExists) =>
+      DropBranchExec(catalog, ident, branch, ifExists) :: Nil
+
+    case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
+      DropTagExec(catalog, ident, tag, ifExists) :: Nil
+
+    case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), 
transform) =>
+      DropPartitionFieldExec(catalog, ident, transform) :: Nil
+
+    case ReplacePartitionField(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          transformFrom,
+          transformTo,
+          name) =>
+      ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, 
name) :: Nil
+
+    case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+    case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+    case SetWriteDistributionAndOrdering(
+          IcebergCatalogAndIdentifier(catalog, ident),
+          distributionMode,
+          ordering) =>
+      SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, 
ordering) :: Nil
+
+    case ReplaceIcebergData(_: DataSourceV2Relation, query, r: 
DataSourceV2Relation, Some(write)) =>
+      // refresh the cache using the original relation
+      ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
+
+    case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation, 
projs, Some(write)) =>
+      // refresh the cache using the original relation
+      WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil
+
+    case MergeRows(
+          isSourceRowPresent,
+          isTargetRowPresent,
+          matchedConditions,
+          matchedOutputs,
+          notMatchedConditions,
+          notMatchedOutputs,
+          targetOutput,
+          performCardinalityCheck,
+          emitNotMatchedTargetRows,
+          output,
+          child) =>
+      MergeRowsExec(
+        isSourceRowPresent,
+        isTargetRowPresent,
+        matchedConditions,
+        matchedOutputs,
+        notMatchedConditions,
+        notMatchedOutputs,
+        targetOutput,
+        performCardinalityCheck,
+        emitNotMatchedTargetRows,
+        output,
+        planLater(child)) :: Nil
+
+    case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output, _), 
condition, None) =>
+      // the optimizer has already checked that this delete can be handled 
using a metadata operation
+      val deleteCond = condition.getOrElse(Literal.TrueLiteral)
+      val predicates = splitConjunctivePredicates(deleteCond)
+      val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, 
output)
+      val filters = normalizedPredicates.flatMap { pred =>
+        val filter = DataSourceStrategy.translateFilter(pred, 
supportNestedPredicatePushdown = true)
+        if (filter.isEmpty) {
+          throw 
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
+        }
+        filter
+      }.toArray
+      DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil
+
+    case NoStatsUnaryNode(child) =>
+      planLater(child) :: Nil
+
+    case _ => Nil
+  }
+
+  private def buildInternalRow(exprs: Seq[Expression]): InternalRow = {
+    val values = new Array[Any](exprs.size)
+    for (index <- exprs.indices) {
+      values(index) = exprs(index).eval()
+    }
+    new GenericInternalRow(values)
+  }
+
+  private def refreshCache(r: DataSourceV2Relation)(): Unit = {
+    spark.sharedState.cacheManager.recacheByPlan(spark, r)
+  }
+
+  /**
+   * support UnifiedSessionCatalog
+   */
+  private object IcebergCatalogAndIdentifier {
+    def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] = 
{
+      val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, 
identifier.asJava)
+      catalogAndIdentifier.catalog match {
+        case icebergCatalog: SparkCatalog =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkSessionCatalog[_] =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkUnifiedCatalog =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case icebergCatalog: SparkUnifiedSessionCatalog[_] =>
+          Some((icebergCatalog, catalogAndIdentifier.identifier))
+        case _ =>
+          None
+      }
+    }
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
index d473df156..8adc59126 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
@@ -231,6 +231,9 @@
                                         <exclude>
                                             
org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameterImpl
                                         </exclude>
+                                        <exclude>
+                                            
org.apache.spark.sql.connector.iceberg.write.*
+                                        </exclude>
                                     </excludes>
                                 </relocation>
 
@@ -268,6 +271,9 @@
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.execution.datasources.v2.MixedFormatExtendedDataSourceV2Strategy*
+                                        </include>
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.MergeIntoExec*
                                         </include>
@@ -283,6 +289,27 @@
                                         <include>
                                             
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrderingExec*
                                         </include>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.MergeRowsExec*
+                                        </inlucde>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceBranchExec*
+                                        </inlucde>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceTagExec*
+                                        </inlucde>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.DropBranchExec*
+                                        </inlucde>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.DropTagExec*
+                                        </inlucde>
+                                        <inlucde>
+                                            
org.apache.spark.sql.execution.datasources.v2.DropIdentifierFieldsExec*
+                                        </inlucde>
+                                        <inlucde>
+                                        
org.apache.spark.sql.execution.datasources.v2.WriteDeltaExec*
+                                        </inlucde>
                                     </includes>
                                 </relocation>
                                 <relocation>
@@ -354,6 +381,12 @@
                                         <include>
                                             
org.apache.spark.sql.catalyst.optimizer.RewriteUpdate*
                                         </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.optimizer.ExtendedSimplifyConditionalsInPredicate*
+                                        </include>
+                                        <include>
+                                            
org.apache.spark.sql.catalyst.optimizer.ExtendedReplaceNullWithFalseInPredicate*
+                                        </include>
                                     </includes>
                                 </relocation>
                                 <relocation>
@@ -371,73 +404,6 @@
                                         </include>
                                     </includes>
                                 </relocation>
-                                <relocation>
-                                    
<pattern>org.apache.spark.sql.catalyst.plans.logical</pattern>
-                                    <shadedPattern>
-                                        
org.apache.amoro.shade.org.apache.spark.sql.catalyst.plans.logical
-                                    </shadedPattern>
-                                    <includes>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.AddPartitionField*
-                                        </include>
-                                        
<include>org.apache.spark.sql.catalyst.plans.logical.Call*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.MergeIntoContext
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.MergeRows
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.NamedArgument*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.PositionalArgument*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.RowLevelCommand
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.WriteDelta
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields*
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable
-                                        </include>
-                                        <include>
-                                            
org.apache.spark.sql.catalyst.plans.logical.V2WriteCommandLike
-                                        </include>
-                                    </includes>
-                                </relocation>
                                 <relocation>
                                     
<pattern>org.apache.spark.sql.catalyst.utils</pattern>
                                     <shadedPattern>
@@ -462,6 +428,17 @@
                                     
<shadedPattern>org.apache.amoro.shade.org.apache.hc
                                     </shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    
<pattern>org.apache.spark.sql.execution.dynamicpruning</pattern>
+                                    <shadedPattern>
+                                        
org.apache.amoro.shade.org.apache.spark.sql.execution.dynamicpruning
+                                    </shadedPattern>
+                                    <includes>
+                                        <include>
+                                            
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning*
+                                        </include>
+                                    </includes>
+                                </relocation>
                             </relocations>
                             
<finalName>${project.artifactId}-${project.version}</finalName>
                         </configuration>

Reply via email to