This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 269381a44bf5 [SPARK-48781][SQL] Add Catalog APIs for loading stored
procedures
269381a44bf5 is described below
commit 269381a44bf5eac13710dc78a4da5b97c85c248d
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Aug 28 10:16:30 2024 -0700
[SPARK-48781][SQL] Add Catalog APIs for loading stored procedures
### What changes were proposed in this pull request?
This PR contains new connector APIs for loading stored procedures per
[discussed and
voted](https://lists.apache.org/thread/w586jr53fxwk4pt9m94b413xyjr1v25m) SPIP
tracked in [SPARK-44167](https://issues.apache.org/jira/browse/SPARK-44167). It
is a subset of changes from PR #47183.
### Why are the changes needed?
Stored procedures are routines invoked via CALL statements. They are
commonly used for encapsulating non-trivial operations and may contain multiple
commands, conditional logic, and side effects. These changes are needed to
allow catalogs to expose custom routines to Spark. This effort aims to enhance
Spark’s ability to interact with external connectors and allow users to perform
more operations in plain SQL.
### Does this PR introduce _any_ user-facing change?
Yes, this PR adds new connector APIs.
### How was this patch tested?
N/A.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47190 from aokolnychyi/spark-48781.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/connector/catalog/ProcedureCatalog.java | 37 +++++++
.../catalog/procedures/BoundProcedure.java | 57 ++++++++++
.../connector/catalog/procedures/Procedure.java | 40 +++++++
.../catalog/procedures/ProcedureParameter.java | 119 +++++++++++++++++++++
.../catalog/procedures/UnboundProcedure.java | 43 ++++++++
.../connector/ProcedureParameterImpl.scala | 29 +++++
6 files changed, 325 insertions(+)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
new file mode 100644
index 000000000000..6eaacf340cb8
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ProcedureCatalog.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure;
+
+/**
+ * A catalog API for working with procedures.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ProcedureCatalog extends CatalogPlugin {
+ /**
+ * Load a procedure by {@link Identifier identifier} from the catalog.
+ *
+ * @param ident a procedure identifier
+ * @return the loaded unbound procedure
+ */
+ UnboundProcedure loadProcedure(Identifier ident);
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
new file mode 100644
index 000000000000..99f0836576f8
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/BoundProcedure.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import java.util.Iterator;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.LocalScan;
+import org.apache.spark.sql.connector.read.Scan;
+
+/**
+ * A procedure that is bound to input types.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface BoundProcedure extends Procedure {
+ /**
+ * Returns parameters of this procedure.
+ */
+ ProcedureParameter[] parameters();
+
+ /**
+ * Indicates whether this procedure is deterministic.
+ */
+ boolean isDeterministic();
+
+ /**
+ * Executes this procedure with the given input.
+ * <p>
+ * Spark validates and rearranges arguments provided in the CALL statement
to ensure that
+ * the order and data types of the fields in {@code input} matches the
expected order and
+ * types defined by {@link #parameters() parameters}.
+ * <p>
+ * Each procedure can return any number of result sets. Each result set is
represented by
+ * a {@link Scan scan} that reports the type of records it produces and can
be used to
+ * collect the output, if needed. If a result set is local and does not a
distributed job,
+ * implementations should use {@link LocalScan}.
+ */
+ Iterator<Scan> call(InternalRow input);
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
new file mode 100644
index 000000000000..4f88d215d319
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/Procedure.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A base interface for all procedures.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface Procedure {
+ /**
+ * Returns the name of this procedure.
+ */
+ String name();
+
+ /**
+ * Returns the description of this procedure.
+ */
+ default String description() {
+ return getClass().toString();
+ }
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
new file mode 100644
index 000000000000..90d531ae2189
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/ProcedureParameter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.internal.connector.ProcedureParameterImpl;
+import org.apache.spark.sql.types.DataType;
+
+import static
org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode.IN;
+
+/**
+ * A {@link Procedure procedure} parameter.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ProcedureParameter {
+ /**
+ * Creates a builder for an IN procedure parameter.
+ *
+ * @param name the name of the parameter
+ * @param dataType the type of the parameter
+ * @return the constructed stored procedure parameter
+ */
+ static Builder in(String name, DataType dataType) {
+ return new Builder(IN, name, dataType);
+ }
+
+ /**
+ * Returns the mode of this parameter.
+ */
+ Mode mode();
+
+ /**
+ * Returns the name of this parameter.
+ */
+ String name();
+
+ /**
+ * Returns the data type of this parameter.
+ */
+ DataType dataType();
+
+ /**
+ * Returns the SQL string (Spark SQL dialect) of the default value
expression of this parameter or
+ * null if not provided.
+ */
+ @Nullable
+ String defaultValueExpression();
+
+ /**
+ * Returns the comment of this parameter or null if not provided.
+ */
+ @Nullable
+ String comment();
+
+ /**
+ * An enum representing procedure parameter modes.
+ */
+ enum Mode {
+ IN,
+ INOUT,
+ OUT
+ }
+
+ class Builder {
+ private final Mode mode;
+ private final String name;
+ private final DataType dataType;
+ private String defaultValueExpression;
+ private String comment;
+
+ private Builder(Mode mode, String name, DataType dataType) {
+ this.mode = mode;
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ /**
+ * Sets the default value expression of the parameter.
+ */
+ public Builder defaultValue(String defaultValueExpression) {
+ this.defaultValueExpression = defaultValueExpression;
+ return this;
+ }
+
+ /**
+ * Sets the comment of the parameter.
+ */
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ /**
+ * Builds the stored procedure parameter.
+ */
+ public ProcedureParameter build() {
+ return new ProcedureParameterImpl(mode, name, dataType,
defaultValueExpression, comment);
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
new file mode 100644
index 000000000000..ee9a09055243
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/procedures/UnboundProcedure.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.procedures;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that is not bound to input types.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface UnboundProcedure extends Procedure {
+ /**
+ * Binds this procedure to input types.
+ * <p>
+ * If the catalog supports procedure overloading, the implementation is
expected to pick the best
+ * matching version of the procedure. If overloading is not supported, the
implementation can
+ * validate if the input types are compatible while binding or delegate that
to Spark. Regardless,
+ * Spark will always perform the final validation of the arguments and
rearrange them as needed
+ * based on {@link BoundProcedure#parameters() reported parameters}.
+ *
+ * @param inputType the input types to bind to
+ * @return the bound procedure that is most suitable for the given input
types
+ */
+ BoundProcedure bind(StructType inputType);
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
new file mode 100644
index 000000000000..01ea48af1537
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ProcedureParameterImpl.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter
+import
org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter.Mode
+import org.apache.spark.sql.types.DataType
+
+case class ProcedureParameterImpl(
+ mode: Mode,
+ name: String,
+ dataType: DataType,
+ defaultValueExpression: String,
+ comment: String) extends ProcedureParameter
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]