baiyangtx commented on code in PR #2894:
URL: https://github.com/apache/amoro/pull/2894#discussion_r1650262330


##########
amoro-mixed-format/amoro-mixed-format-spark/v3.2/amoro-mixed-format-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:04 @Description: */
+public class SparkUnifiedSessionCatalog<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+    extends SparkUnifiedSessionCatalogBase<T> {
+
+  @Override
+  protected SparkUnifiedCatalogBase createUnifiedCatalog(
+      String name, CaseInsensitiveStringMap options) {
+    SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+    sparkUnifiedCatalog.initialize(name, options);

Review Comment:
   Move  call `initialize` to  base.  only create a object is fine.



##########
amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+
+/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */
+public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase
+    implements TableCatalog, SupportsNamespaces, ProcedureCatalog, 
FunctionCatalog {
+
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for functions
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).listFunctions(namespace);
+    }
+    throw new NoSuchNamespaceException(namespace);
+  }
+
+  /**
+   * Load a function by {@link Identifier identifier} from the catalog.
+   *
+   * @param ident a function identifier
+   * @return an unbound function instance
+   * @throws NoSuchFunctionException If the function doesn't exist
+   */
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+
+    TableCatalog tableCatalog = tableCatalog(TableFormat.ICEBERG);
+    if (tableCatalog instanceof FunctionCatalog) {
+      return ((FunctionCatalog) tableCatalog).loadFunction(ident);
+    }
+    throw new NoSuchFunctionException(ident);
+  }
+
+  /**
+   * Returns true if the function exists, false otherwise.
+   *
+   * @param ident
+   * @since 3.3.0
+   */
+  @Override
+  public boolean functionExists(Identifier ident) {
+    return FunctionCatalog.super.functionExists(ident);
+  }

Review Comment:
   Seems not nessary to override this method.



##########
amoro-mixed-format/amoro-mixed-format-spark/v3.3/amoro-mixed-format-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedSessionCatalog.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.TableFormat;
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * For TableCatalog in spark 3.3 is different with spark 3.2。 so we define it 
seperately 1、 we
+ * support the grammar feature of time travel. 2、 support FunctionCatalog
+ */
+public class SparkUnifiedSessionCatalog<
+        T extends TableCatalog & SupportsNamespaces & FunctionCatalog>
+    extends SparkUnifiedSessionCatalogBase<T> {
+
+  @Override
+  protected SparkUnifiedCatalogBase createUnifiedCatalog(
+      String name, CaseInsensitiveStringMap options) {
+    SparkUnifiedCatalog sparkUnifiedCatalog = new SparkUnifiedCatalog();
+    sparkUnifiedCatalog.initialize(name, options);
+    return sparkUnifiedCatalog;
+  }
+
+  @Override
+  public Table loadTable(Identifier ident, String version) throws 
NoSuchTableException {
+    try {
+      TableCatalog catalog = getTargetCatalog();
+      SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) 
catalog;
+      return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, 
version);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      return getSessionCatalog().loadTable(ident, version);
+    }
+  }
+
+  @Override
+  public Table loadTable(Identifier ident, long timestamp) throws 
NoSuchTableException {
+    try {
+      TableCatalog catalog = getTargetCatalog();
+      SparkUnifiedCatalogBase unifiedCatalog = (SparkUnifiedCatalogBase) 
catalog;
+      return unifiedCatalog.tableCatalog(TableFormat.ICEBERG).loadTable(ident, 
timestamp);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      return getSessionCatalog().loadTable(ident, timestamp);
+    }
+  }
+
+  /**
+   * List the functions in a namespace from the catalog.
+   *
+   * <p>If there are no functions in the namespace, implementations should 
return an empty array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for functions
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  @Override
+  public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+    SparkUnifiedCatalog catalog = (SparkUnifiedCatalog) getTargetCatalog();
+    return catalog.listFunctions(namespace);
+  }
+
+  @Override
+  public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+    String[] namespace = ident.namespace();
+    String name = ident.name();
+
+    // Allow for empty namespace, as Spark's storage partitioned joins look up
+    // the corresponding functions to generate transforms for partitioning
+    // with an empty namespace, such as `bucket`.
+    // Otherwise, use `system` namespace.
+    if (namespace.length == 0 || isSystemNamespace(namespace)) {
+      UnboundFunction func = SparkFunctions.load(name);
+      if (func != null) {
+        return func;
+      }
+    }
+
+    throw new NoSuchFunctionException(ident);
+  }
+
+  private static boolean isSystemNamespace(String[] namespace) {
+    return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+  }
+
+  @Override
+  public Procedure loadProcedure(Identifier ident) throws 
NoSuchProcedureException {
+    SparkUnifiedCatalogBase catalog = (SparkUnifiedCatalogBase) 
getTargetCatalog();
+    return catalog.loadProcedure(ident);
+  }

Review Comment:
   This mehod is looked like same with spark 3.2. could we move it to 
spark-common?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to