baiyangtx commented on code in PR #2894: URL: https://github.com/apache/amoro/pull/2894#discussion_r1650262330
########## amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.spark; + +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.Procedure; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */ +public class SparkUnifiedSessionCatalog< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> + extends SparkUnifiedSessionCatalogBase<T> { + + @Override + protected SparkUnifiedCatalogBase createUnifiedCatalog( + String name, CaseInsensitiveStringMap options) { + SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog(); + sparkUnifiedCatalog.initialize(name, options); Review Comment: Move call `initialize` to base. only create a object is fine. ########## amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.spark; + +import org.apache.amoro.TableFormat; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; + +/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */ +public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase + implements TableCatalog, SupportsNamespaces, ProcedureCatalog, FunctionCatalog { + + /** + * List the functions in a namespace from the catalog. + * + * <p>If there are no functions in the namespace, implementations should return an empty array. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for functions + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG); + if (tableCatalog instanceof FunctionCatalog) { + return ((FunctionCatalog) tableCatalog).listFunctions(namespace); + } + throw new NoSuchNamespaceException(namespace); + } + + /** + * Load a function by {@link Identifier identifier} from the catalog. + * + * @param ident a function identifier + * @return an unbound function instance + * @throws NoSuchFunctionException If the function doesn't exist + */ + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + + TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG); + if (tableCatalog instanceof FunctionCatalog) { + return ((FunctionCatalog) tableCatalog).loadFunction(ident); + } + throw new NoSuchFunctionException(ident); + } + + /** + * Returns true if the function exists, false otherwise. + * + * @param ident + * @since 3.3.0 + */ + @Override + public boolean functionExists(Identifier ident) { + return FunctionCatalog.super.functionExists(ident); + } Review Comment: Seems not nessary to override this method. ########## amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.spark; + +import org.apache.amoro.TableFormat; +import org.apache.iceberg.spark.functions.SparkFunctions; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.connector.iceberg.catalog.Procedure; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * For TableCatalog in spark 3.3 is different with spark 3.2。 so we define it seperately 1、 we + * support the grammar feature of time travel. 2、 support FunctionCatalog + */ +public class SparkUnifiedSessionCatalog< + T extends TableCatalog & SupportsNamespaces & FunctionCatalog> + extends SparkUnifiedSessionCatalogBase<T> { + + @Override + protected SparkUnifiedCatalogBase createUnifiedCatalog( + String name, CaseInsensitiveStringMap options) { + SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog(); + sparkUnifiedCatalog.initialize(name, options); + return sparkUnifiedCatalog; + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + try { + TableCatalog catalog = getTargetCatalog(); + SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) catalog; + return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, version); + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + return getSessionCatalog().loadTable(ident, version); + } + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + try { + TableCatalog catalog = getTargetCatalog(); + SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) catalog; + return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, timestamp); + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + return getSessionCatalog().loadTable(ident, timestamp); + } + } + + /** + * List the functions in a namespace from the catalog. + * + * <p>If there are no functions in the namespace, implementations should return an empty array. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for functions + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog(); + return catalog.listFunctions(namespace); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + String[] namespace = ident.namespace(); + String name = ident.name(); + + // Allow for empty namespace, as Spark's storage partitioned joins look up + // the corresponding functions to generate transforms for partitioning + // with an empty namespace, such as `bucket`. + // Otherwise, use `system` namespace. + if (namespace.length == 0 || isSystemNamespace(namespace)) { + UnboundFunction func = SparkFunctions.load(name); + if (func != null) { + return func; + } + } + + throw new NoSuchFunctionException(ident); + } + + private static boolean isSystemNamespace(String[] namespace) { + return namespace.length == 1 && namespace[0].equalsIgnoreCase("system"); + } + + @Override + public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { + SparkUnifiedCatalogBase catalog = (SparkUnifiedCatalogBase) getTargetCatalog(); + return catalog.loadProcedure(ident); + } Review Comment: This mehod is looked like same with spark 3.2. could we move it to spark-common? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
