This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 269381a44bf5 [SPARK-48781][SQL] Add Catalog APIs for loading stored 
procedures
269381a44bf5 is described below

commit 269381a44bf5eac13710dc78a4da5b97c85c248d
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Aug 28 10:16:30 2024 -0700

    [SPARK-48781][SQL] Add Catalog APIs for loading stored procedures
    
    ### What changes were proposed in this pull request?
    
    This PR contains new connector APIs for loading stored procedures per 
[discussed and 
voted](https://lists.apache.org/thread/w586jr53fxwk4pt9m94b413xyjr1v25m) SPIP 
tracked in [SPARK-44167](https://issues.apache.org/jira/browse/SPARK-44167). It 
is a subset of changes from PR #47183.
    
    ### Why are the changes needed?
    
    Stored procedures are routines invoked via CALL statements. They are 
commonly used for encapsulating non-trivial operations and may contain multiple 
commands, conditional logic, and side effects. These changes are needed to 
allow catalogs to expose custom routines to Spark. This effort aims to enhance 
Spark’s ability to interact with external connectors and allow users to perform 
more operations in plain SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR adds new connector APIs.
    
    ### How was this patch tested?
    
    N/A.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47190 from aokolnychyi/spark-48781.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/connector/catalog/ProcedureCatalog.java    |  37 +++++++
 .../catalog/procedures/BoundProcedure.java         |  57 ++++++++++
 .../connector/catalog/procedures/Procedure.java    |  40 +++++++
 .../catalog/procedures/ProcedureParameter.java     | 119 +++++++++++++++++++++
 .../catalog/procedures/UnboundProcedure.java       |  43 ++++++++
 .../connector/ProcedureParameterImpl.scala         |  29 +++++
 6 files changed, 325 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
new file mode 100644
index 000000000000..6eaacf340cb8
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure;
+
+/**
+ * A catalog API for working with procedures.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ProcedureCatalog extends CatalogPlugin {
+  /**
+   * Load a procedure by {@link Identifier identifier} from the catalog.
+   *
+   * @param ident a procedure identifier
+   * @return the loaded unbound procedure
+   */
+  UnboundProcedure loadProcedure(Identifier ident);
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
new file mode 100644
index 000000000000..99f0836576f8
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import java.util.Iterator;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.LocalScan;
+import org.apache.spark.sql.connector.read.Scan;
+
+/**
+ * A procedure that is bound to input types.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface BoundProcedure extends Procedure {
+  /**
+   * Returns parameters of this procedure.
+   */
+  ProcedureParameter[] parameters();
+
+  /**
+   * Indicates whether this procedure is deterministic.
+   */
+  boolean isDeterministic();
+
+  /**
+   * Executes this procedure with the given input.
+   * <p>
+   * Spark validates and rearranges arguments provided in the CALL statement 
to ensure that
+   * the order and data types of the fields in {@code input} matches the 
expected order and
+   * types defined by {@link #parameters() parameters}.
+   * <p>
+   * Each procedure can return any number of result sets. Each result set is 
represented by
+   * a {@link Scan scan} that reports the type of records it produces and can 
be used to
+   * collect the output, if needed. If a result set is local and does not a 
distributed job,
+   * implementations should use {@link LocalScan}.
+   */
+  Iterator<Scan> call(InternalRow input);
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
new file mode 100644
index 000000000000..4f88d215d319
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A base interface for all procedures.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface Procedure {
+  /**
+   * Returns the name of this procedure.
+   */
+  String name();
+
+  /**
+   * Returns the description of this procedure.
+   */
+  default String description() {
+    return getClass().toString();
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
new file mode 100644
index 000000000000..90d531ae2189
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.internal.connector.ProcedureParameterImpl;
+import org.apache.spark.sql.types.DataType;
+
+import static 
org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode.IN;
+
+/**
+ * A {@link Procedure procedure} parameter.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ProcedureParameter {
+  /**
+   * Creates a builder for an IN procedure parameter.
+   *
+   * @param name the name of the parameter
+   * @param dataType the type of the parameter
+   * @return the constructed stored procedure parameter
+   */
+  static Builder in(String name, DataType dataType) {
+    return new Builder(IN, name, dataType);
+  }
+
+  /**
+   * Returns the mode of this parameter.
+   */
+  Mode mode();
+
+  /**
+   * Returns the name of this parameter.
+   */
+  String name();
+
+  /**
+   * Returns the data type of this parameter.
+   */
+  DataType dataType();
+
+  /**
+   * Returns the SQL string (Spark SQL dialect) of the default value 
expression of this parameter or
+   * null if not provided.
+   */
+  @Nullable
+  String defaultValueExpression();
+
+  /**
+   * Returns the comment of this parameter or null if not provided.
+   */
+  @Nullable
+  String comment();
+
+  /**
+   * An enum representing procedure parameter modes.
+   */
+  enum Mode {
+    IN,
+    INOUT,
+    OUT
+  }
+
+  class Builder {
+    private final Mode mode;
+    private final String name;
+    private final DataType dataType;
+    private String defaultValueExpression;
+    private String comment;
+
+    private Builder(Mode mode, String name, DataType dataType) {
+      this.mode = mode;
+      this.name = name;
+      this.dataType = dataType;
+    }
+
+    /**
+     * Sets the default value expression of the parameter.
+     */
+    public Builder defaultValue(String defaultValueExpression) {
+      this.defaultValueExpression = defaultValueExpression;
+      return this;
+    }
+
+    /**
+     * Sets the comment of the parameter.
+     */
+    public Builder comment(String comment) {
+      this.comment = comment;
+      return this;
+    }
+
+    /**
+     * Builds the stored procedure parameter.
+     */
+    public ProcedureParameter build() {
+      return new ProcedureParameterImpl(mode, name, dataType, 
defaultValueExpression, comment);
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
new file mode 100644
index 000000000000..ee9a09055243
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that is not bound to input types.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface UnboundProcedure extends Procedure {
+  /**
+   * Binds this procedure to input types.
+   * <p>
+   * If the catalog supports procedure overloading, the implementation is 
expected to pick the best
+   * matching version of the procedure. If overloading is not supported, the 
implementation can
+   * validate if the input types are compatible while binding or delegate that 
to Spark. Regardless,
+   * Spark will always perform the final validation of the arguments and 
rearrange them as needed
+   * based on {@link BoundProcedure#parameters() reported parameters}.
+   *
+   * @param inputType the input types to bind to
+   * @return the bound procedure that is most suitable for the given input 
types
+   */
+  BoundProcedure bind(StructType inputType);
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
new file mode 100644
index 000000000000..01ea48af1537
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter
+import 
org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode
+import org.apache.spark.sql.types.DataType
+
+case class ProcedureParameterImpl(
+    mode: Mode,
+    name: String,
+    dataType: DataType,
+    defaultValueExpression: String,
+    comment: String) extends ProcedureParameter


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to