This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new d20f13393 [AMORO-2819]Spark cannot execute the "alter table set
identifier field" command on tables in Iceberg format in unified catalog (#2894)
d20f13393 is described below
commit d20f13393c1041b4d6fd95a262cf5cee93fd1a9f
Author: Wang Tao <[email protected]>
AuthorDate: Mon Jun 24 11:36:06 2024 +0800
[AMORO-2819]Spark cannot execute the "alter table set identifier field"
command on tables in Iceberg format in unified catalog (#2894)
* fix some error when access iceberg table through unified catalog
* fix error for alter table set identifier field
* add some ut
* fix checkstyle
* fix review suggestion
* fix review suggestion
* fix review suggestion
* fix review suggestion
---
...edCatalog.java => SparkUnifiedCatalogBase.java} | 6 +-
...og.java => SparkUnifiedSessionCatalogBase.java} | 11 +-
.../amoro/spark/iceberg/IcebergSparkFormat.java | 6 +-
.../apache/amoro/spark/test/SparkTestContext.java | 3 +-
.../test/unified/UnifiedCatalogTestSuites.java | 135 ++++++++++++-
.../apache/amoro/spark/SparkUnifiedCatalog.java | 68 +++++++
.../amoro/spark/SparkUnifiedSessionCatalog.java | 36 ++++
.../amoro/spark/MixedFormatSparkExtensions.scala | 4 +-
.../MixedFormatExtendedDataSourceV2Strategy.scala | 218 +++++++++++++++++++++
.../amoro-mixed-format-spark-runtime-3.2/pom.xml | 66 +++++--
.../apache/amoro/spark/SparkUnifiedCatalog.java | 132 +++++++++++++
.../amoro/spark/SparkUnifiedSessionCatalog.java | 129 ++++++++++++
.../amoro/spark/MixedFormatSparkExtensions.scala | 5 +-
.../MixedFormatExtendedDataSourceV2Strategy.scala | 216 ++++++++++++++++++++
.../amoro-mixed-format-spark-runtime-3.3/pom.xml | 111 +++++------
15 files changed, 1051 insertions(+), 95 deletions(-)
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
similarity index 98%
rename from
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
rename to
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
index f5564bce9..e4662d9e9 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
+++
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java
@@ -57,9 +57,9 @@ import java.util.Map;
import java.util.ServiceLoader;
/** Unified catalog implement for spark engine. */
-public class SparkUnifiedCatalog implements TableCatalog, SupportsNamespaces,
ProcedureCatalog {
+public class SparkUnifiedCatalogBase implements TableCatalog,
SupportsNamespaces, ProcedureCatalog {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkUnifiedCatalog.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkUnifiedCatalogBase.class);
private static final Map<TableFormat, String> defaultTableCatalogImplMap =
ImmutableMap.of(
TableFormat.ICEBERG, "org.apache.iceberg.spark.SparkCatalog",
@@ -272,7 +272,7 @@ public class SparkUnifiedCatalog implements TableCatalog,
SupportsNamespaces, Pr
return procedureCatalog.loadProcedure(ident);
}
- private TableCatalog tableCatalog(TableFormat format) {
+ protected TableCatalog tableCatalog(TableFormat format) {
return tableCatalogs.computeIfAbsent(format, this::initializeTableCatalog);
}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
similarity index 85%
rename from
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
rename to
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
index cf9a7b557..a42db5c42 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
+++
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java
@@ -32,14 +32,17 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.Map;
import java.util.ServiceLoader;
-public class SparkUnifiedSessionCatalog<T extends TableCatalog &
SupportsNamespaces>
+public abstract class SparkUnifiedSessionCatalogBase<T extends TableCatalog &
SupportsNamespaces>
extends SessionCatalogBase<T> implements ProcedureCatalog {
- private final Map<TableFormat, SparkTableFormat> tableFormats =
Maps.newConcurrentMap();
+ protected final Map<TableFormat, SparkTableFormat> tableFormats =
Maps.newConcurrentMap();
+
+ protected abstract SparkUnifiedCatalogBase createUnifiedCatalog(
+ String name, CaseInsensitiveStringMap options);
@Override
protected TableCatalog buildTargetCatalog(String name,
CaseInsensitiveStringMap options) {
- SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+ SparkUnifiedCatalogBase sparkUnifiedCatalog = createUnifiedCatalog(name,
options);
sparkUnifiedCatalog.initialize(name, options);
ServiceLoader<SparkTableFormat> sparkTableFormats =
ServiceLoader.load(SparkTableFormat.class);
for (SparkTableFormat format : sparkTableFormats) {
@@ -80,7 +83,7 @@ public class SparkUnifiedSessionCatalog<T extends
TableCatalog & SupportsNamespa
@Override
public Procedure loadProcedure(Identifier ident) throws
NoSuchProcedureException {
- SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+ SparkUnifiedCatalogBase catalog = (SparkUnifiedCatalogBase)
getTargetCatalog();
return catalog.loadProcedure(ident);
}
}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
index f3513f908..d733f9124 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
+++
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/iceberg/IcebergSparkFormat.java
@@ -31,6 +31,8 @@ import java.util.regex.Pattern;
public class IcebergSparkFormat implements SparkTableFormat {
private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
+ private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
+ private static final Pattern TAG = Pattern.compile("tag_(.*)");
@Override
public TableFormat format() {
@@ -41,7 +43,9 @@ public class IcebergSparkFormat implements SparkTableFormat {
public boolean isSubTableName(String tableName) {
return MetadataTableType.from(tableName) != null
|| AT_TIMESTAMP.matcher(tableName).matches()
- || SNAPSHOT_ID.matcher(tableName).matches();
+ || SNAPSHOT_ID.matcher(tableName).matches()
+ || TAG.matcher(tableName).matches()
+ || BRANCH.matcher(tableName).matches();
}
@Override
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 862f7caf8..1c06a9c8f 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -27,7 +27,6 @@ import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.spark.SparkUnifiedCatalog;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -50,7 +49,7 @@ public class SparkTestContext {
"org.apache.amoro.spark.MixedFormatSparkExtensions"
+
",org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
- public static final String UNIFIED_CATALOG_IMP =
SparkUnifiedCatalog.class.getName();
+ public static final String UNIFIED_CATALOG_IMP =
"org.apache.amoro.spark.SparkUnifiedCatalog";
final TemporaryFolder warehouse = new TemporaryFolder();
public static final String AMS_ALL_FORMAT_CATALOG_NAME = "all_formats";
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
index 8ed2b0088..b35684297 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
+++
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/unified/UnifiedCatalogTestSuites.java
@@ -25,8 +25,10 @@ import org.apache.amoro.UnifiedCatalogLoader;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.spark.SparkUnifiedSessionCatalog;
import org.apache.amoro.spark.test.SparkTestBase;
+import org.apache.amoro.spark.test.TestIdentifier;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -34,6 +36,7 @@ import org.junit.jupiter.params.provider.Arguments;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class UnifiedCatalogTestSuites extends SparkTestBase {
@@ -41,7 +44,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
protected Map<String, String> sparkSessionConfig() {
return ImmutableMap.of(
"spark.sql.catalog.spark_catalog",
- SparkUnifiedSessionCatalog.class.getName(),
+ "org.apache.amoro.spark.SparkUnifiedSessionCatalog",
"spark.sql.catalog.spark_catalog.uri",
CONTEXT.amsCatalogUrl(null));
}
@@ -64,7 +67,7 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
"CREATE TABLE "
+ target()
+ " ( "
- + "id int, "
+ + "id int not null, "
+ "data string, "
+ "pt string"
+ pkDDL(format)
@@ -94,6 +97,13 @@ public class UnifiedCatalogTestSuites extends SparkTestBase {
// call procedure
testCallProcedure(format);
+ // alter table test
+ testIcebergAlterTable(format, target(), "id");
+ testIcebergUpdate(format, target());
+ testIcebergDeleteInSubQuery(format, target());
+ testIcebergDropPartitionField(format, target(), "pt");
+ testIcebergMetadata(format, target());
+
sql("DROP TABLE " + target() + " PURGE");
Assertions.assertFalse(unifiedCatalog().tableExists(target().database,
target().table));
}
@@ -177,4 +187,123 @@ public class UnifiedCatalogTestSuites extends
SparkTestBase {
unifiedCatalog.createDatabase(database());
}
}
+
+ private void testIcebergAlterTable(
+ TableFormat format, TestIdentifier targetTable, String fieldName) {
+ if (TableFormat.ICEBERG != format) {
+ // only tests for iceberg
+ return;
+ }
+ // set identifier fields
+ String sqlText =
+ String.format("alter table %s set identifier fields %s", targetTable,
fieldName);
+ Dataset<Row> rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ AmoroTable<?> icebergTable =
+ unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+ Table table = (Table) icebergTable.originalTable();
+
Assertions.assertTrue(table.schema().identifierFieldNames().contains(fieldName));
+
+ // drop identifier fields/
+ sqlText = String.format("alter table %s DROP IDENTIFIER FIELDS %s",
targetTable, fieldName);
+ rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ icebergTable = unifiedCatalog().loadTable(targetTable.database,
targetTable.table);
+ table = (Table) icebergTable.originalTable();
+
Assertions.assertFalse(table.schema().identifierFieldNames().contains(fieldName));
+ }
+
+ private void testIcebergUpdate(TableFormat format, TestIdentifier
targetTable) {
+ if (TableFormat.ICEBERG != format) {
+ // only tests for iceberg
+ return;
+ }
+ String afterValue = "update-xxx";
+ // set identifier fields
+ String sqlText = String.format("update %s set data='%s' where id=1",
targetTable, afterValue);
+ Dataset<Row> rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ // drop identifier fields/
+ sqlText = String.format("select * from %s where id=1", targetTable);
+ rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 3 &&
rs.head().getString(1).equals(afterValue));
+ }
+
+ private void testIcebergDeleteInSubQuery(TableFormat format, TestIdentifier
targetTable) {
+ if (TableFormat.ICEBERG != format) {
+ // only tests for iceberg
+ return;
+ }
+ // set identifier fields
+ String sqlText = String.format("select min(id) from %s", targetTable);
+ Dataset<Row> rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 1);
+ Integer minId = rs.head().getInt(0);
+
+ // set identifier fields
+ sqlText =
+ String.format(
+ "delete from %s where id=(select min(id) from %s);", targetTable,
targetTable);
+ rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ // drop identifier fields/
+ sqlText = String.format("select * from %s where id=%d", targetTable,
minId);
+ rs = sql(sqlText);
+ Assertions.assertTrue(rs.count() == 0);
+ }
+
+ private void testIcebergDropPartitionField(
+ TableFormat format, TestIdentifier targetTable, String fieldName) {
+ if (TableFormat.ICEBERG != format) {
+ // only tests for iceberg
+ return;
+ }
+ // drop partition field
+ String sqlText =
+ String.format("alter table %s drop partition field %s", targetTable,
fieldName);
+ Dataset<Row> rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ AmoroTable<?> icebergTable =
+ unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+ Table table = (Table) icebergTable.originalTable();
+ Assertions.assertTrue(
+ Optional.empty()
+ .equals(
+ table.spec().fields().stream()
+ .filter(item -> item.name().equals(fieldName))
+ .findAny()));
+ }
+
+ private void testIcebergMetadata(TableFormat format, TestIdentifier
targetTable) {
+ if (TableFormat.ICEBERG != format) {
+ // only tests for iceberg
+ return;
+ }
+ // drop partition field
+ String tagKey = "tag-unittest";
+ String sqlText =
+ String.format("alter table %s create tag if not exists `%s`",
targetTable, tagKey);
+ Dataset<Row> rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ AmoroTable<?> icebergTable =
+ unifiedCatalog().loadTable(targetTable.database, targetTable.table);
+ Table table = (Table) icebergTable.originalTable();
+ Map<String, SnapshotRef> refs = table.refs();
+ Assertions.assertTrue(refs != null && refs.containsKey(tagKey));
+
+ sqlText = String.format("alter table %s drop tag `%s`", targetTable,
tagKey);
+ rs = sql(sqlText);
+ Assertions.assertTrue(rs.columns().length == 0);
+
+ icebergTable = unifiedCatalog().loadTable(targetTable.database,
targetTable.table);
+ table = (Table) icebergTable.originalTable();
+ refs = table.refs();
+ Assertions.assertTrue(refs != null && !refs.containsKey(tagKey));
+ }
}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
new file mode 100644
index 000000000..6a9e840bd
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+ implements TableCatalog, SupportsNamespaces, ProcedureCatalog,
FunctionCatalog {
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * <p>If there are no functions in the namespace, implementations should
return an empty array.
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for functions
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
+ */
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ /**
+ * Load a function by {@link Identifier identifier} from the catalog.
+ *
+ * @param ident a function identifier
+ * @return an unbound function instance
+ * @throws NoSuchFunctionException If the function doesn't exist
+ */
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+ }
+ throw new NoSuchFunctionException(ident);
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
new file mode 100644
index 000000000..2fab4b9bf
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */
+public class SparkUnifiedSessionCatalog<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+ extends SparkUnifiedSessionCatalogBase<T> {
+
+ @Override
+ protected SparkUnifiedCatalogBase createUnifiedCatalog(
+ String name, CaseInsensitiveStringMap options) {
+ return new SparkUnifiedCatalog();
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
index eb07d0899..f56e88e69 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSessionExtensions
import
org.apache.spark.sql.catalyst.analysis.{AlignedRowLevelIcebergCommandCheck,
AlignRowLevelCommandAssignments, CheckMergeIntoTableConditions,
MergeIntoIcebergTableResolutionCheck, ProcedureArgumentCoercion,
ResolveMergeIntoTableReferences, ResolveProcedures, RewriteDeleteFromTable,
RewriteMergeIntoTable, RewriteUpdateTable}
import org.apache.spark.sql.catalyst.optimizer._
import
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
-import
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy,
ExtendedV2Writes, OptimizeMetadataOnlyDeleteFromTable,
ReplaceRewrittenRowLevelCommand, RowLevelCommandScanRelationPushDown}
+import
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy,
ExtendedV2Writes, MixedFormatExtendedDataSourceV2Strategy,
OptimizeMetadataOnlyDeleteFromTable, ReplaceRewrittenRowLevelCommand,
RowLevelCommandScanRelationPushDown}
import
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning
import org.apache.amoro.spark.sql.catalyst.analysis._
@@ -71,7 +71,7 @@ class MixedFormatSparkExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectPreCBORule { _ => ReplaceRewrittenRowLevelCommand }
// planner extensions
- extensions.injectPlannerStrategy { spark =>
ExtendedDataSourceV2Strategy(spark) }
+ extensions.injectPlannerStrategy { spark =>
MixedFormatExtendedDataSourceV2Strategy(spark) }
// mixed-format optimizer rules
extensions.injectPreCBORule(OptimizeWriteRule)
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
new file mode 100644
index 000000000..d4f52f226
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.SparkCatalog
+import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
+import org.apache.spark.sql.catalyst.plans.logical.DropBranch
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
+import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
+import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.WriteDelta
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import org.apache.amoro.spark.{SparkUnifiedCatalog, SparkUnifiedSessionCatalog}
+
+/**
+ * refer apache iceberg project
+ * spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/
+ * datasources/v2/ExtendedDataSourceV2Strategy.scala
+ *
+ * @param spark
+ */
+case class MixedFormatExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy
+ with PredicateHelper {
+
+ import DataSourceV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case c @ Call(procedure, args) =>
+ val input = buildInternalRow(args)
+ CallExec(c.output, procedure, input) :: Nil
+
+ case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transform, name) =>
+ AddPartitionFieldExec(catalog, ident, transform, name) :: Nil
+
+ case CreateOrReplaceBranch(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ branch,
+ branchOptions,
+ create,
+ replace,
+ ifNotExists) =>
+ CreateOrReplaceBranchExec(
+ catalog,
+ ident,
+ branch,
+ branchOptions,
+ create,
+ replace,
+ ifNotExists) :: Nil
+
+ case CreateOrReplaceTag(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ tag,
+ tagOptions,
+ create,
+ replace,
+ ifNotExists) =>
+ CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace,
ifNotExists) :: Nil
+
+ case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch,
ifExists) =>
+ DropBranchExec(catalog, ident, branch, ifExists) :: Nil
+
+ case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
+ DropTagExec(catalog, ident, tag, ifExists) :: Nil
+
+ case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transform) =>
+ DropPartitionFieldExec(catalog, ident, transform) :: Nil
+
+ case ReplacePartitionField(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ transformFrom,
+ transformTo,
+ name) =>
+ ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo,
name) :: Nil
+
+ case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+ case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+ case SetWriteDistributionAndOrdering(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ distributionMode,
+ ordering) =>
+ SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode,
ordering) :: Nil
+
+ case ReplaceData(_: DataSourceV2Relation, query, r: DataSourceV2Relation,
Some(write)) =>
+ // refresh the cache using the original relation
+ ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
+
+ case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation,
projs, Some(write)) =>
+ // refresh the cache using the original relation
+ WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil
+
+ case MergeRows(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedConditions,
+ matchedOutputs,
+ notMatchedConditions,
+ notMatchedOutputs,
+ targetOutput,
+ rowIdAttrs,
+ performCardinalityCheck,
+ emitNotMatchedTargetRows,
+ output,
+ child) =>
+ MergeRowsExec(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedConditions,
+ matchedOutputs,
+ notMatchedConditions,
+ notMatchedOutputs,
+ targetOutput,
+ rowIdAttrs,
+ performCardinalityCheck,
+ emitNotMatchedTargetRows,
+ output,
+ planLater(child)) :: Nil
+
+ case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output),
condition, None) =>
+ // the optimizer has already checked that this delete can be handled
using a metadata operation
+ val deleteCond = condition.getOrElse(Literal.TrueLiteral)
+ val predicates = splitConjunctivePredicates(deleteCond)
+ val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates,
output)
+ val filters = normalizedPredicates.flatMap { pred =>
+ val filter = DataSourceStrategy.translateFilter(pred,
supportNestedPredicatePushdown = true)
+ if (filter.isEmpty) {
+ throw
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
+ }
+ filter
+ }.toArray
+ DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil
+
+ case NoStatsUnaryNode(child) =>
+ planLater(child) :: Nil
+
+ case _ => Nil
+ }
+
+ private def buildInternalRow(exprs: Seq[Expression]): InternalRow = {
+ val values = new Array[Any](exprs.size)
+ for (index <- exprs.indices) {
+ values(index) = exprs(index).eval()
+ }
+ new GenericInternalRow(values)
+ }
+
+ private def refreshCache(r: DataSourceV2Relation)(): Unit = {
+ spark.sharedState.cacheManager.recacheByPlan(spark, r)
+ }
+
+ /**
+ * support UnifiedSessionCatalog
+ */
+ private object IcebergCatalogAndIdentifier {
+ def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] =
{
+ val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark,
identifier.asJava)
+ catalogAndIdentifier.catalog match {
+ case icebergCatalog: SparkCatalog =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkSessionCatalog[_] =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkUnifiedCatalog =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkUnifiedSessionCatalog[_] =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case _ =>
+ None
+ }
+ }
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
index 484be250e..b1dbaf402 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-runtime-3.2/pom.xml
@@ -263,6 +263,9 @@
<include>
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy*
</include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.MixedFormatExtendedDataSourceV2Strategy*
+ </include>
<include>
org.apache.spark.sql.execution.datasources.v2.MergeIntoExec*
</include>
@@ -278,6 +281,18 @@
<include>
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrderingExec*
</include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable*
+ </include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.RowLevelCommandScanRelationPushDown*
+ </include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.ExtendedV2Writes*
+ </include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.ReplaceRewrittenRowLevelCommand*
+ </include>
</includes>
</relocation>
<relocation>
@@ -312,6 +327,30 @@
<include>
org.apache.spark.sql.catalyst.analysis.RowLevelOperationsPredicateCheck*
</include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.ResolveMergeIntoTableReferences*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.CheckMergeIntoTableConditions*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.AlignRowLevelCommandAssignments*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.RewriteDeleteFromTable*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.MergeIntoIcebergTableResolutionCheck*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.analysis.AlignedRowLevelIcebergCommandCheck*
+ </include>
</includes>
</relocation>
<relocation>
@@ -335,19 +374,10 @@
</shadedPattern>
<includes>
<include>
-
org.apache.spark.sql.catalyst.optimizer.OptimizeConditionsInRowLevelOperations*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicatesInRowLevelOperations*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.optimizer.RewriteDelete*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.optimizer.RewriteMergeInto*
+
org.apache.spark.sql.catalyst.optimizer.ExtendedSimplifyConditionalsInPredicate*
</include>
<include>
-
org.apache.spark.sql.catalyst.optimizer.RewriteUpdate*
+
org.apache.spark.sql.catalyst.optimizer.ExtendedReplaceNullWithFalseInPredicate*
</include>
</includes>
</relocation>
@@ -364,6 +394,9 @@
<include>
org.apache.spark.sql.catalyst.parser.extensions.UpperCaseCharStream*
</include>
+ <include>
+
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser*
+ </include>
</includes>
</relocation>
<relocation>
@@ -439,6 +472,17 @@
<shadedPattern>org.apache.amoro.shade.org.apache.hc
</shadedPattern>
</relocation>
+ <relocation>
+
<pattern>org.apache.spark.sql.execution.dynamicpruning</pattern>
+ <shadedPattern>
+
org.apache.amoro.shade.org.apache.spark.sql.execution.dynamicpruning
+ </shadedPattern>
+ <includes>
+ <include>
+
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning*
+ </include>
+ </includes>
+ </relocation>
</relocations>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
new file mode 100644
index 000000000..44c813300
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+ implements TableCatalog, SupportsNamespaces, ProcedureCatalog,
FunctionCatalog {
+
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * <p>If there are no functions in the namespace, implementations should
return an empty array.
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for functions
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
+ */
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ /**
+ * Load a function by {@link Identifier identifier} from the catalog.
+ *
+ * @param ident a function identifier
+ * @return an unbound function instance
+ * @throws NoSuchFunctionException If the function doesn't exist
+ */
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+ }
+ throw new NoSuchFunctionException(ident);
+ }
+
+ /**
+ * Drop a namespace from the catalog with cascade mode, recursively dropping
all objects within
+ * the namespace if cascade is true.
+ *
+ * <p>If the catalog implementation does not support this operation, it may
throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional)
+ * @throws NonEmptyNamespaceException If the namespace is non-empty and
cascade is false
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ return false;
+ }
+
+ /**
+ * Load table metadata of a specific version by {@link Identifier
identifier} from the catalog.
+ *
+ * <p>If the catalog supports views and contains a view for the identifier
and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param version version of the table
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ @Override
+ public Table loadTable(Identifier ident, String version) throws
NoSuchTableException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog == null) {
+ throw new UnsupportedOperationException("Doesn't support iceberg table
catalog");
+ }
+ return tableCatalog.loadTable(ident, version);
+ }
+
+ /**
+ * Load table metadata at a specific time by {@link Identifier identifier}
from the catalog.
+ *
+ * <p>If the catalog supports views and contains a view for the identifier
and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param timestamp timestamp of the table, which is microseconds since
1970-01-01 00:00:00 UTC
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ @Override
+ public Table loadTable(Identifier ident, long timestamp) throws
NoSuchTableException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog == null) {
+ throw new UnsupportedOperationException("Only support iceberg format
now!");
+ }
+ return tableCatalog.loadTable(ident, timestamp);
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
new file mode 100644
index 000000000..96213a63f
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * For TableCatalog in spark 3.3 is different with spark 3.2。 so we define it
seperately 1、 we
+ * support the grammar feature of time travel. 2、 support FunctionCatalog
+ */
+public class SparkUnifiedSessionCatalog<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+ extends SparkUnifiedSessionCatalogBase<T> {
+
+ @Override
+ protected SparkUnifiedCatalogBase createUnifiedCatalog(
+ String name, CaseInsensitiveStringMap options) {
+ return new SparkUnifiedCatalog();
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, String version) throws
NoSuchTableException {
+ try {
+ TableCatalog catalog = getTargetCatalog();
+ SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase)
catalog;
+ return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident,
version);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ return getSessionCatalog().loadTable(ident, version);
+ }
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, long timestamp) throws
NoSuchTableException {
+ try {
+ TableCatalog catalog = getTargetCatalog();
+ SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase)
catalog;
+ return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident,
timestamp);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ return getSessionCatalog().loadTable(ident, timestamp);
+ }
+ }
+
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * <p>If there are no functions in the namespace, implementations should
return an empty array.
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for functions
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
+ */
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+ return catalog.listFunctions(namespace);
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ String[] namespace = ident.namespace();
+ String name = ident.name();
+
+ // Allow for empty namespace, as Spark's storage partitioned joins look up
+ // the corresponding functions to generate transforms for partitioning
+ // with an empty namespace, such as `bucket`.
+ // Otherwise, use `system` namespace.
+ if (namespace.length == 0 || isSystemNamespace(namespace)) {
+ UnboundFunction func = SparkFunctions.load(name);
+ if (func != null) {
+ return func;
+ }
+ }
+
+ throw new NoSuchFunctionException(ident);
+ }
+
+ private static boolean isSystemNamespace(String[] namespace) {
+ return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+ }
+
+ /**
+ * Drop a namespace from the catalog with cascade mode, recursively dropping
all objects within
+ * the namespace if cascade is true.
+ *
+ * <p>If the catalog implementation does not support this operation, it may
throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional)
+ * @throws NonEmptyNamespaceException If the namespace is non-empty and
cascade is false
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+ return catalog.dropNamespace(namespace, cascade);
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
index f105d1874..8164587d6 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/amoro/spark/MixedFormatSparkExtensions.scala
@@ -22,7 +22,8 @@ import org.apache.spark.sql.SparkSessionExtensions
import
org.apache.spark.sql.catalyst.analysis.{AlignedRowLevelIcebergCommandCheck,
AlignRowLevelCommandAssignments, CheckMergeIntoTableConditions,
MergeIntoIcebergTableResolutionCheck, ProcedureArgumentCoercion,
ResolveMergeIntoTableReferences, ResolveProcedures,
RewriteDeleteFromIcebergTable, RewriteMergeIntoTable, RewriteUpdateTable}
import org.apache.spark.sql.catalyst.optimizer._
import
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
-import
org.apache.spark.sql.execution.datasources.v2.{ExtendedDataSourceV2Strategy,
ExtendedV2Writes, OptimizeMetadataOnlyDeleteFromIcebergTable,
ReplaceRewrittenRowLevelCommand, RowLevelCommandScanRelationPushDown}
+import org.apache.spark.sql.execution.datasources.v2.{ExtendedV2Writes,
OptimizeMetadataOnlyDeleteFromIcebergTable, ReplaceRewrittenRowLevelCommand,
RowLevelCommandScanRelationPushDown}
+import
org.apache.spark.sql.execution.datasources.v2.{MixedFormatExtendedDataSourceV2Strategy,
ReplaceRewrittenRowLevelCommand}
import
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning
import org.apache.amoro.spark.sql.catalyst.analysis
@@ -77,7 +78,7 @@ class MixedFormatSparkExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectPreCBORule { _ => ReplaceRewrittenRowLevelCommand }
// planner extensions
- extensions.injectPlannerStrategy { spark =>
ExtendedDataSourceV2Strategy(spark) }
+ extensions.injectPlannerStrategy { spark =>
MixedFormatExtendedDataSourceV2Strategy(spark) }
// mixed-format optimizer rules
extensions.injectPreCBORule(OptimizeWriteRule)
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
new file mode 100644
index 000000000..58763db67
--- /dev/null
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.SparkCatalog
+import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
+import org.apache.spark.sql.catalyst.plans.logical.DropBranch
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows
+import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
+import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
+import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.WriteDelta
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import org.apache.amoro.UnifiedCatalog
+import org.apache.amoro.spark.{SparkUnifiedCatalog, SparkUnifiedSessionCatalog}
+
+/**
+ * refer apache iceberg project
+ * spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/
+ * datasources/v2/ExtendedDataSourceV2Strategy.scala
+ * @param spark
+ */
+case class MixedFormatExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy
+ with PredicateHelper {
+
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case c @ Call(procedure, args) =>
+ val input = buildInternalRow(args)
+ CallExec(c.output, procedure, input) :: Nil
+
+ case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transform, name) =>
+ AddPartitionFieldExec(catalog, ident, transform, name) :: Nil
+
+ case CreateOrReplaceBranch(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ branch,
+ branchOptions,
+ create,
+ replace,
+ ifNotExists) =>
+ CreateOrReplaceBranchExec(
+ catalog,
+ ident,
+ branch,
+ branchOptions,
+ create,
+ replace,
+ ifNotExists) :: Nil
+
+ case CreateOrReplaceTag(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ tag,
+ tagOptions,
+ create,
+ replace,
+ ifNotExists) =>
+ CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace,
ifNotExists) :: Nil
+
+ case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch,
ifExists) =>
+ DropBranchExec(catalog, ident, branch, ifExists) :: Nil
+
+ case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
+ DropTagExec(catalog, ident, tag, ifExists) :: Nil
+
+ case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transform) =>
+ DropPartitionFieldExec(catalog, ident, transform) :: Nil
+
+ case ReplacePartitionField(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ transformFrom,
+ transformTo,
+ name) =>
+ ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo,
name) :: Nil
+
+ case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+ case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+ case SetWriteDistributionAndOrdering(
+ IcebergCatalogAndIdentifier(catalog, ident),
+ distributionMode,
+ ordering) =>
+ SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode,
ordering) :: Nil
+
+ case ReplaceIcebergData(_: DataSourceV2Relation, query, r:
DataSourceV2Relation, Some(write)) =>
+ // refresh the cache using the original relation
+ ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
+
+ case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation,
projs, Some(write)) =>
+ // refresh the cache using the original relation
+ WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil
+
+ case MergeRows(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedConditions,
+ matchedOutputs,
+ notMatchedConditions,
+ notMatchedOutputs,
+ targetOutput,
+ performCardinalityCheck,
+ emitNotMatchedTargetRows,
+ output,
+ child) =>
+ MergeRowsExec(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedConditions,
+ matchedOutputs,
+ notMatchedConditions,
+ notMatchedOutputs,
+ targetOutput,
+ performCardinalityCheck,
+ emitNotMatchedTargetRows,
+ output,
+ planLater(child)) :: Nil
+
+ case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output, _),
condition, None) =>
+ // the optimizer has already checked that this delete can be handled
using a metadata operation
+ val deleteCond = condition.getOrElse(Literal.TrueLiteral)
+ val predicates = splitConjunctivePredicates(deleteCond)
+ val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates,
output)
+ val filters = normalizedPredicates.flatMap { pred =>
+ val filter = DataSourceStrategy.translateFilter(pred,
supportNestedPredicatePushdown = true)
+ if (filter.isEmpty) {
+ throw
QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
+ }
+ filter
+ }.toArray
+ DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil
+
+ case NoStatsUnaryNode(child) =>
+ planLater(child) :: Nil
+
+ case _ => Nil
+ }
+
+ private def buildInternalRow(exprs: Seq[Expression]): InternalRow = {
+ val values = new Array[Any](exprs.size)
+ for (index <- exprs.indices) {
+ values(index) = exprs(index).eval()
+ }
+ new GenericInternalRow(values)
+ }
+
+ private def refreshCache(r: DataSourceV2Relation)(): Unit = {
+ spark.sharedState.cacheManager.recacheByPlan(spark, r)
+ }
+
+ /**
+ * support UnifiedSessionCatalog
+ */
+ private object IcebergCatalogAndIdentifier {
+ def unapply(identifier: Seq[String]): Option[(TableCatalog, Identifier)] =
{
+ val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark,
identifier.asJava)
+ catalogAndIdentifier.catalog match {
+ case icebergCatalog: SparkCatalog =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkSessionCatalog[_] =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkUnifiedCatalog =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case icebergCatalog: SparkUnifiedSessionCatalog[_] =>
+ Some((icebergCatalog, catalogAndIdentifier.identifier))
+ case _ =>
+ None
+ }
+ }
+ }
+}
diff --git
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
index d473df156..8adc59126 100644
---
a/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
+++
b/amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-runtime-3.3/pom.xml
@@ -231,6 +231,9 @@
<exclude>
org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameterImpl
</exclude>
+ <exclude>
+
org.apache.spark.sql.connector.iceberg.write.*
+ </exclude>
</excludes>
</relocation>
@@ -268,6 +271,9 @@
<include>
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy*
</include>
+ <include>
+
org.apache.spark.sql.execution.datasources.v2.MixedFormatExtendedDataSourceV2Strategy*
+ </include>
<include>
org.apache.spark.sql.execution.datasources.v2.MergeIntoExec*
</include>
@@ -283,6 +289,27 @@
<include>
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrderingExec*
</include>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.MergeRowsExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceBranchExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceTagExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.DropBranchExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.DropTagExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.DropIdentifierFieldsExec*
+ </inlucde>
+ <inlucde>
+
org.apache.spark.sql.execution.datasources.v2.WriteDeltaExec*
+ </inlucde>
</includes>
</relocation>
<relocation>
@@ -354,6 +381,12 @@
<include>
org.apache.spark.sql.catalyst.optimizer.RewriteUpdate*
</include>
+ <include>
+
org.apache.spark.sql.catalyst.optimizer.ExtendedSimplifyConditionalsInPredicate*
+ </include>
+ <include>
+
org.apache.spark.sql.catalyst.optimizer.ExtendedReplaceNullWithFalseInPredicate*
+ </include>
</includes>
</relocation>
<relocation>
@@ -371,73 +404,6 @@
</include>
</includes>
</relocation>
- <relocation>
-
<pattern>org.apache.spark.sql.catalyst.plans.logical</pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.spark.sql.catalyst.plans.logical
- </shadedPattern>
- <includes>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.AddPartitionField*
- </include>
-
<include>org.apache.spark.sql.catalyst.plans.logical.Call*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.MergeIntoContext
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.MergeRows
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.NamedArgument*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.PositionalArgument*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.RowLevelCommand
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.WriteDelta
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields*
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable
- </include>
- <include>
-
org.apache.spark.sql.catalyst.plans.logical.V2WriteCommandLike
- </include>
- </includes>
- </relocation>
<relocation>
<pattern>org.apache.spark.sql.catalyst.utils</pattern>
<shadedPattern>
@@ -462,6 +428,17 @@
<shadedPattern>org.apache.amoro.shade.org.apache.hc
</shadedPattern>
</relocation>
+ <relocation>
+
<pattern>org.apache.spark.sql.execution.dynamicpruning</pattern>
+ <shadedPattern>
+
org.apache.amoro.shade.org.apache.spark.sql.execution.dynamicpruning
+ </shadedPattern>
+ <includes>
+ <include>
+
org.apache.spark.sql.execution.dynamicpruning.RowLevelCommandDynamicPruning*
+ </include>
+ </includes>
+ </relocation>
</relocations>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>