baiyangtx commented on code in PR #2894:
URL: https://github.com/apache/amoro/pull/2894#discussion_r1643624039
##########
amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalogBase.java:
##########
@@ -32,20 +32,20 @@
import java.util.Map;
import java.util.ServiceLoader;
-public class SparkUnifiedSessionCatalog<T extends TableCatalog &
SupportsNamespaces>
+public class SparkUnifiedSessionCatalogBase<T extends TableCatalog &
SupportsNamespaces>
extends SessionCatalogBase<T> implements ProcedureCatalog {
- private final Map<TableFormat, SparkTableFormat> tableFormats =
Maps.newConcurrentMap();
+ protected final Map<TableFormat, SparkTableFormat> tableFormats =
Maps.newConcurrentMap();
@Override
protected TableCatalog buildTargetCatalog(String name,
CaseInsensitiveStringMap options) {
- SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
- sparkUnifiedCatalog.initialize(name, options);
+ SparkUnifiedCatalogBase sparkUnifiedCatalogBase = new
SparkUnifiedCatalogBase();
+ sparkUnifiedCatalogBase.initialize(name, options);
Review Comment:
How about add a abstract method for `new SparkUnifiedCatalogBase()` and
implement it in different spark version?
##########
amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.ServiceLoader;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */
+public class SparkUnifiedSessionCatalog<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+ extends SparkUnifiedSessionCatalogBase<T> {
+
+ @Override
+ protected TableCatalog buildTargetCatalog(String name,
CaseInsensitiveStringMap options) {
+ SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+ sparkUnifiedCatalog.initialize(name, options);
+ ServiceLoader<SparkTableFormat> sparkTableFormats =
ServiceLoader.load(SparkTableFormat.class);
+ for (SparkTableFormat format : sparkTableFormats) {
+ tableFormats.put(format.format(), format);
+ }
+ return sparkUnifiedCatalog;
+ }
+
+ @Override
+ public Procedure loadProcedure(Identifier ident) throws
NoSuchProcedureException {
+ SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+ return catalog.loadProcedure(ident);
+ }
Review Comment:
Seems we don't need this override.
##########
amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/pom.xml:
##########
@@ -41,6 +41,7 @@
</properties>
<dependencies>
+
Review Comment:
Useless modifications.
##########
amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+ implements TableCatalog, SupportsNamespaces, ProcedureCatalog,
FunctionCatalog {
+
+ /**
+ * Return a default namespace for the catalog.
+ *
+ * <p>When this catalog is set as the current catalog, the namespace
returned by this method will
+ * be set as the current namespace.
+ *
+ * <p>The namespace returned by this method is not required to exist.
+ *
+ * @return a multi-part namespace
+ */
+ @Override
+ public String[] defaultNamespace() {
+ return super.defaultNamespace();
+ }
Review Comment:
It seems that we need not override this method.
##########
amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.ServiceLoader;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */
+public class SparkUnifiedSessionCatalog<
+ T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+ extends SparkUnifiedSessionCatalogBase<T> {
+
+ @Override
+ protected TableCatalog buildTargetCatalog(String name,
CaseInsensitiveStringMap options) {
+ SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+ sparkUnifiedCatalog.initialize(name, options);
+ ServiceLoader<SparkTableFormat> sparkTableFormats =
ServiceLoader.load(SparkTableFormat.class);
+ for (SparkTableFormat format : sparkTableFormats) {
+ tableFormats.put(format.format(), format);
+ }
+ return sparkUnifiedCatalog;
+ }
Review Comment:
I suggest only override `new SparkUnifiedCatalog()` part.
##########
amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+ implements TableCatalog, SupportsNamespaces, ProcedureCatalog,
FunctionCatalog {
+
+ /**
+ * Return a default namespace for the catalog.
+ *
+ * <p>When this catalog is set as the current catalog, the namespace
returned by this method will
+ * be set as the current namespace.
+ *
+ * <p>The namespace returned by this method is not required to exist.
+ *
+ * @return a multi-part namespace
+ */
+ @Override
+ public String[] defaultNamespace() {
+ return super.defaultNamespace();
+ }
+
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * <p>If there are no functions in the namespace, implementations should
return an empty array.
+ *
+ * @param namespace a multi-part namespace
+ * @return an array of Identifiers for functions
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional).
+ */
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ /**
+ * Load a function by {@link Identifier identifier} from the catalog.
+ *
+ * @param ident a function identifier
+ * @return an unbound function instance
+ * @throws NoSuchFunctionException If the function doesn't exist
+ */
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog instanceof FunctionCatalog) {
+ return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+ }
+ throw new NoSuchFunctionException(ident);
+ }
+
+ /**
+ * Returns true if the function exists, false otherwise.
+ *
+ * @param ident
+ * @since 3.3.0
+ */
+ @Override
+ public boolean functionExists(Identifier ident) {
+ return FunctionCatalog.super.functionExists(ident);
+ }
+
+ /**
+ * Drop a namespace from the catalog with cascade mode, recursively dropping
all objects within
+ * the namespace if cascade is true.
+ *
+ * <p>If the catalog implementation does not support this operation, it may
throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws NoSuchNamespaceException If the namespace does not exist
(optional)
+ * @throws NonEmptyNamespaceException If the namespace is non-empty and
cascade is false
+ * @throws UnsupportedOperationException If drop is not a supported operation
+ */
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ return false;
+ }
+
+ /**
+ * Load table metadata of a specific version by {@link Identifier
identifier} from the catalog.
+ *
+ * <p>If the catalog supports views and contains a view for the identifier
and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param version version of the table
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ @Override
+ public Table loadTable(Identifier ident, String version) throws
NoSuchTableException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog == null) {
+ throw new UnsupportedOperationException("Doesn't support iceberg table
catalog");
+ }
+ return tableCatalog.loadTable(ident, version);
+ }
+
+ /**
+ * Load table metadata at a specific time by {@link Identifier identifier}
from the catalog.
+ *
+ * <p>If the catalog supports views and contains a view for the identifier
and not a table, this
+ * must throw {@link NoSuchTableException}.
+ *
+ * @param ident a table identifier
+ * @param timestamp timestamp of the table, which is microseconds since
1970-01-01 00:00:00 UTC
+ * @return the table's metadata
+ * @throws NoSuchTableException If the table doesn't exist or is a view
+ */
+ @Override
+ public Table loadTable(Identifier ident, long timestamp) throws
NoSuchTableException {
+ TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+ if (tableCatalog == null) {
+ throw new UnsupportedOperationException("Doesn't support iceberg table
catalog");
Review Comment:
What means 'Doesn't support iceberg table catalog' ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]