This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 52eea1a496 [#9531] feat(python-client): define function API models and
exception types (#9943)
52eea1a496 is described below
commit 52eea1a49601657ff371ca176ce37dfde715186e
Author: mchades <[email protected]>
AuthorDate: Wed Feb 11 16:03:13 2026 +0800
[#9531] feat(python-client): define function API models and exception types
(#9943)
### What changes were proposed in this pull request?
Add the complete API layer for function support in the Python client:
- FunctionType enum (SCALAR, AGGREGATE, TABLE)
- FunctionColumn, FunctionParam, FunctionResources value types
- FunctionImpl base class with JavaImpl, PythonImpl, SQLImpl
implementations
- FunctionDefinition combining parameters, return columns, and
implementation
- Function interface with name, type, deterministic flag, and
definitions
- FunctionChange with operations: update comment, add/remove
definitions, add/update/remove implementations
- FunctionCatalog interface for function CRUD operations
- Catalog.as_function_catalog() extension point
- NoSuchFunctionException, FunctionAlreadyExistsException
### Why are the changes needed?
Fix: #9531
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Unit tests will be added in a follow-up PR when the client
implementation is built.
---
.../gravitino/function/FunctionDefinitions.java | 3 +
clients/client-python/gravitino/api/catalog.py | 12 +
.../gravitino/api/function/__init__.py | 16 ++
.../gravitino/api/function/function.py | 63 ++++
.../gravitino/api/function/function_catalog.py | 159 +++++++++++
.../gravitino/api/function/function_change.py | 318 +++++++++++++++++++++
.../gravitino/api/function/function_column.py | 91 ++++++
.../gravitino/api/function/function_definition.py | 191 +++++++++++++
.../gravitino/api/function/function_impl.py | 117 ++++++++
.../gravitino/api/function/function_param.py | 123 ++++++++
.../gravitino/api/function/function_resources.py | 93 ++++++
.../gravitino/api/function/function_type.py | 61 ++++
.../gravitino/api/function/java_impl.py | 169 +++++++++++
.../gravitino/api/function/python_impl.py | 192 +++++++++++++
.../gravitino/api/function/sql_impl.py | 167 +++++++++++
clients/client-python/gravitino/exceptions/base.py | 8 +
16 files changed, 1783 insertions(+)
diff --git
a/api/src/main/java/org/apache/gravitino/function/FunctionDefinitions.java
b/api/src/main/java/org/apache/gravitino/function/FunctionDefinitions.java
index cbe702fa01..1ba1555ff0 100644
--- a/api/src/main/java/org/apache/gravitino/function/FunctionDefinitions.java
+++ b/api/src/main/java/org/apache/gravitino/function/FunctionDefinitions.java
@@ -52,6 +52,7 @@ public final class FunctionDefinitions {
*/
public static FunctionDefinition of(
FunctionParam[] parameters, Type returnType, FunctionImpl[] impls) {
+ Preconditions.checkArgument(returnType != null, "Return type cannot be
null");
return new FunctionDefinitionImpl(parameters, returnType, null, impls);
}
@@ -65,6 +66,8 @@ public final class FunctionDefinitions {
*/
public static FunctionDefinition of(
FunctionParam[] parameters, FunctionColumn[] returnColumns,
FunctionImpl[] impls) {
+ Preconditions.checkArgument(
+ ArrayUtils.isNotEmpty(returnColumns), "Return columns cannot be null
or empty");
return new FunctionDefinitionImpl(parameters, null, returnColumns, impls);
}
diff --git a/clients/client-python/gravitino/api/catalog.py
b/clients/client-python/gravitino/api/catalog.py
index 2014f8a398..34c916f7eb 100644
--- a/clients/client-python/gravitino/api/catalog.py
+++ b/clients/client-python/gravitino/api/catalog.py
@@ -189,6 +189,18 @@ class Catalog(Auditable):
"""
raise UnsupportedOperationException("Catalog does not support model
operations")
+ def as_function_catalog(self) -> "FunctionCatalog": # noqa: F821
+ """
+ Returns:
+ the {@link FunctionCatalog} if the catalog supports function
operations.
+
+ Raises:
+ UnsupportedOperationException if the catalog does not support
function operations.
+ """
+ raise UnsupportedOperationException(
+ "Catalog does not support function operations"
+ )
+
class UnsupportedOperationException(Exception):
pass
diff --git a/clients/client-python/gravitino/api/function/__init__.py
b/clients/client-python/gravitino/api/function/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/clients/client-python/gravitino/api/function/function.py
b/clients/client-python/gravitino/api/function/function.py
new file mode 100644
index 0000000000..4aec6550f4
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import abstractmethod
+from typing import List, Optional
+
+from gravitino.api.auditable import Auditable
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+
+
+class Function(Auditable):
+ """An interface representing a user-defined function under a schema
Namespace.
+
+ A function is a reusable computational unit that can be invoked within
queries across
+ different compute engines. Users can register a function in Gravitino to
manage the
+ function metadata and enable cross-engine function sharing. The typical
use case is
+ to define custom business logic once and reuse it across multiple compute
engines
+ like Spark, Trino, and AI engines.
+
+ A function is characterized by its name, type (scalar for row-by-row
operations,
+ aggregate for group operations, or table-valued for set-returning
operations),
+ whether it is deterministic, and its definitions that contain parameters,
return
+ type or columns (for table function), and implementations for different
runtime engines.
+ """
+
+ @abstractmethod
+ def name(self) -> str:
+ """Returns the function name."""
+ pass
+
+ @abstractmethod
+ def function_type(self) -> FunctionType:
+ """Returns the function type."""
+ pass
+
+ @abstractmethod
+ def deterministic(self) -> bool:
+ """Returns whether the function is deterministic."""
+ pass
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional comment of the function."""
+ return None
+
+ @abstractmethod
+ def definitions(self) -> List[FunctionDefinition]:
+ """Returns the definitions of the function."""
+ pass
diff --git a/clients/client-python/gravitino/api/function/function_catalog.py
b/clients/client-python/gravitino/api/function/function_catalog.py
new file mode 100644
index 0000000000..ef1619520c
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_catalog.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from typing import List, Optional
+
+from gravitino.api.function.function import Function
+from gravitino.api.function.function_change import FunctionChange
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_type import FunctionType
+from gravitino.exceptions.base import NoSuchFunctionException
+from gravitino.name_identifier import NameIdentifier
+from gravitino.namespace import Namespace
+
+
+class FunctionCatalog(ABC):
+ """The FunctionCatalog interface defines the public API for managing
functions in a schema."""
+
+ @abstractmethod
+ def list_functions(self, namespace: Namespace) -> List[NameIdentifier]:
+ """List the functions in a namespace from the catalog.
+
+ Args:
+ namespace: A namespace.
+
+ Returns:
+ A list of function identifiers in the namespace.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ """
+ pass
+
+ @abstractmethod
+ def list_function_infos(self, namespace: Namespace) -> List[Function]:
+ """List the functions with details in a namespace from the catalog.
+
+ Args:
+ namespace: A namespace.
+
+ Returns:
+ A list of functions in the namespace.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ """
+ pass
+
+ @abstractmethod
+ def get_function(self, ident: NameIdentifier) -> Function:
+ """Get a function by NameIdentifier from the catalog.
+
+ The identifier only contains the schema and function name. A function
may
+ include multiple definitions (overloads) in the result.
+
+ Args:
+ ident: A function identifier.
+
+ Returns:
+ The function with the given name.
+
+ Raises:
+ NoSuchFunctionException: If the function does not exist.
+ """
+ pass
+
+ def function_exists(self, ident: NameIdentifier) -> bool:
+ """Check if a function with the given name exists in the catalog.
+
+ Args:
+ ident: The function identifier.
+
+ Returns:
+ True if the function exists, False otherwise.
+ """
+ try:
+ self.get_function(ident)
+ return True
+ except NoSuchFunctionException:
+ return False
+
+ @abstractmethod
+ def register_function(
+ self,
+ ident: NameIdentifier,
+ comment: Optional[str],
+ function_type: FunctionType,
+ deterministic: bool,
+ definitions: List[FunctionDefinition],
+ ) -> Function:
+ """Register a function with one or more definitions (overloads).
+
+ Each definition contains its own return type (for scalar/aggregate
functions)
+ or return columns (for table-valued functions).
+
+ Args:
+ ident: The function identifier.
+ comment: The optional function comment.
+ function_type: The function type (SCALAR, AGGREGATE, or TABLE).
+ deterministic: Whether the function is deterministic.
+ definitions: The function definitions, each containing parameters,
+ return type/columns, and implementations.
+
+ Returns:
+ The registered function.
+
+ Raises:
+ NoSuchSchemaException: If the schema does not exist.
+ FunctionAlreadyExistsException: If the function already exists.
+ """
+ pass
+
+ @abstractmethod
+ def alter_function(
+ self, ident: NameIdentifier, *changes: FunctionChange
+ ) -> Function:
+ """Applies FunctionChange changes to a function in the catalog.
+
+ Implementations may reject the changes. If any change is rejected, no
changes
+ should be applied to the function.
+
+ Args:
+ ident: The NameIdentifier instance of the function to alter.
+ changes: The FunctionChange instances to apply to the function.
+
+ Returns:
+ The updated Function instance.
+
+ Raises:
+ NoSuchFunctionException: If the function does not exist.
+ IllegalArgumentException: If the change is rejected by the
implementation.
+ """
+ pass
+
+ @abstractmethod
+ def drop_function(self, ident: NameIdentifier) -> bool:
+ """Drop a function by name.
+
+ Args:
+ ident: The name identifier of the function.
+
+ Returns:
+ True if the function is deleted, False if the function does not
exist.
+ """
+ pass
diff --git a/clients/client-python/gravitino/api/function/function_change.py
b/clients/client-python/gravitino/api/function/function_change.py
new file mode 100644
index 0000000000..86fd408b59
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_change.py
@@ -0,0 +1,318 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC
+from typing import List
+
+from gravitino.api.function.function_definition import FunctionDefinition
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_param import FunctionParam
+
+
+class FunctionChange(ABC):
+ """Represents a change that can be applied to a function."""
+
+ EMPTY_PARAMS: List[FunctionParam] = []
+ """An empty list of parameters."""
+
+ @staticmethod
+ def update_comment(new_comment: str) -> "FunctionChange":
+ """Create a FunctionChange to update the comment of a function.
+
+ Args:
+ new_comment: The new comment value.
+
+ Returns:
+ The change instance.
+ """
+ return UpdateComment(new_comment)
+
+ @staticmethod
+ def add_definition(definition: FunctionDefinition) -> "FunctionChange":
+ """Create a FunctionChange to add a new definition (overload) to a
function.
+
+ Args:
+ definition: The new definition to add.
+
+ Returns:
+ The change instance.
+ """
+ return AddDefinition(definition)
+
+ @staticmethod
+ def remove_definition(parameters: List[FunctionParam]) -> "FunctionChange":
+ """Create a FunctionChange to remove an existing definition from a
function.
+
+ Args:
+ parameters: The parameters that identify the definition to remove.
+
+ Returns:
+ The change instance.
+ """
+ return RemoveDefinition(parameters)
+
+ @staticmethod
+ def add_impl(
+ parameters: List[FunctionParam], implementation: FunctionImpl
+ ) -> "FunctionChange":
+ """Create a FunctionChange to add an implementation to a specific
definition.
+
+ Args:
+ parameters: The parameters that identify the definition to update.
+ implementation: The implementation to add.
+
+ Returns:
+ The change instance.
+ """
+ return AddImpl(parameters, implementation)
+
+ @staticmethod
+ def update_impl(
+ parameters: List[FunctionParam],
+ runtime: FunctionImpl.RuntimeType,
+ implementation: FunctionImpl,
+ ) -> "FunctionChange":
+ """Create a FunctionChange to update an implementation for a specific
definition.
+
+ Args:
+ parameters: The parameters that identify the definition to update.
+ runtime: The runtime that identifies the implementation to replace.
+ implementation: The new implementation.
+
+ Returns:
+ The change instance.
+ """
+ return UpdateImpl(parameters, runtime, implementation)
+
+ @staticmethod
+ def remove_impl(
+ parameters: List[FunctionParam], runtime: FunctionImpl.RuntimeType
+ ) -> "FunctionChange":
+ """Create a FunctionChange to remove an implementation for a specific
definition.
+
+ Args:
+ parameters: The parameters that identify the definition to update.
+ runtime: The runtime that identifies the implementation to remove.
+
+ Returns:
+ The change instance.
+ """
+ return RemoveImpl(parameters, runtime)
+
+
+class UpdateComment(FunctionChange):
+ """A FunctionChange to update the comment of a function."""
+
+ def __init__(self, new_comment: str):
+ if not new_comment or not new_comment.strip():
+ raise ValueError("New comment cannot be null or empty")
+ self._new_comment = new_comment
+
+ def new_comment(self) -> str:
+ """Returns the new comment of the function."""
+ return self._new_comment
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, UpdateComment):
+ return False
+ return self._new_comment == other._new_comment
+
+ def __hash__(self) -> int:
+ return hash(self._new_comment)
+
+ def __repr__(self) -> str:
+ return f"UpdateComment(newComment='{self._new_comment}')"
+
+
+class AddDefinition(FunctionChange):
+ """A FunctionChange to add a new definition to a function."""
+
+ def __init__(self, definition: FunctionDefinition):
+ if definition is None:
+ raise ValueError("Definition cannot be null")
+ self._definition = definition
+
+ def definition(self) -> FunctionDefinition:
+ """Returns the definition to add."""
+ return self._definition
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, AddDefinition):
+ return False
+ return self._definition == other._definition
+
+ def __hash__(self) -> int:
+ return hash(self._definition)
+
+ def __repr__(self) -> str:
+ return f"AddDefinition(definition={self._definition})"
+
+
+class RemoveDefinition(FunctionChange):
+ """A FunctionChange to remove an existing definition from a function."""
+
+ def __init__(self, parameters: List[FunctionParam]):
+ if parameters is None:
+ raise ValueError("Parameters cannot be null")
+ self._parameters = list(parameters)
+
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters that identify the definition to remove."""
+ return (
+ list(self._parameters) if self._parameters else
FunctionChange.EMPTY_PARAMS
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, RemoveDefinition):
+ return False
+ return self._parameters == other._parameters
+
+ def __hash__(self) -> int:
+ return hash(tuple(self._parameters))
+
+ def __repr__(self) -> str:
+ return f"RemoveDefinition(parameters={self._parameters})"
+
+
+class AddImpl(FunctionChange):
+ """A FunctionChange to add an implementation to a definition."""
+
+ def __init__(self, parameters: List[FunctionParam], implementation:
FunctionImpl):
+ if parameters is None:
+ raise ValueError("Parameters cannot be null")
+ self._parameters = list(parameters)
+ if implementation is None:
+ raise ValueError("Implementation cannot be null")
+ self._implementation = implementation
+
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters that identify the definition to update."""
+ return (
+ list(self._parameters) if self._parameters else
FunctionChange.EMPTY_PARAMS
+ )
+
+ def implementation(self) -> FunctionImpl:
+ """Returns the implementation to add."""
+ return self._implementation
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, AddImpl):
+ return False
+ return (
+ self._parameters == other._parameters
+ and self._implementation == other._implementation
+ )
+
+ def __hash__(self) -> int:
+ return hash((tuple(self._parameters), self._implementation))
+
+ def __repr__(self) -> str:
+ return (
+ f"AddImpl(parameters={self._parameters}, "
+ f"implementation={self._implementation})"
+ )
+
+
+class UpdateImpl(FunctionChange):
+ """A FunctionChange to replace an implementation for a specific
definition."""
+
+ def __init__(
+ self,
+ parameters: List[FunctionParam],
+ runtime: FunctionImpl.RuntimeType,
+ implementation: FunctionImpl,
+ ):
+ if parameters is None:
+ raise ValueError("Parameters cannot be null")
+ self._parameters = list(parameters)
+ if runtime is None:
+ raise ValueError("Runtime cannot be null")
+ self._runtime = runtime
+ if implementation is None:
+ raise ValueError("Implementation cannot be null")
+ self._implementation = implementation
+ if runtime != implementation.runtime():
+ raise ValueError(
+ "Runtime of implementation must match the runtime being
updated"
+ )
+
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters that identify the definition to update."""
+ return (
+ list(self._parameters) if self._parameters else
FunctionChange.EMPTY_PARAMS
+ )
+
+ def runtime(self) -> FunctionImpl.RuntimeType:
+ """Returns the runtime that identifies the implementation to
replace."""
+ return self._runtime
+
+ def implementation(self) -> FunctionImpl:
+ """Returns the new implementation."""
+ return self._implementation
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, UpdateImpl):
+ return False
+ return (
+ self._parameters == other._parameters
+ and self._runtime == other._runtime
+ and self._implementation == other._implementation
+ )
+
+ def __hash__(self) -> int:
+ return hash((tuple(self._parameters), self._runtime,
self._implementation))
+
+ def __repr__(self) -> str:
+ return (
+ f"UpdateImpl(parameters={self._parameters},
runtime={self._runtime}, "
+ f"implementation={self._implementation})"
+ )
+
+
+class RemoveImpl(FunctionChange):
+ """A FunctionChange to remove an implementation for a specific runtime."""
+
+ def __init__(
+ self, parameters: List[FunctionParam], runtime:
FunctionImpl.RuntimeType
+ ):
+ if parameters is None:
+ raise ValueError("Parameters cannot be null")
+ self._parameters = list(parameters)
+ if runtime is None:
+ raise ValueError("Runtime cannot be null")
+ self._runtime = runtime
+
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters that identify the definition to update."""
+ return (
+ list(self._parameters) if self._parameters else
FunctionChange.EMPTY_PARAMS
+ )
+
+ def runtime(self) -> FunctionImpl.RuntimeType:
+ """Returns the runtime that identifies the implementation to remove."""
+ return self._runtime
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, RemoveImpl):
+ return False
+ return self._parameters == other._parameters and self._runtime ==
other._runtime
+
+ def __hash__(self) -> int:
+ return hash((tuple(self._parameters), self._runtime))
+
+ def __repr__(self) -> str:
+ return f"RemoveImpl(parameters={self._parameters},
runtime={self._runtime})"
diff --git a/clients/client-python/gravitino/api/function/function_column.py
b/clients/client-python/gravitino/api/function/function_column.py
new file mode 100644
index 0000000000..9bc8bd954d
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_column.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from gravitino.api.rel.types.type import Type
+
+
+class FunctionColumn:
+ """Represents a return column of a table-valued function."""
+
+ def __init__(self, name: str, data_type: Type, comment: Optional[str] =
None):
+ """Create a FunctionColumn instance.
+
+ Args:
+ name: The column name.
+ data_type: The column type.
+ comment: The optional comment of the column.
+
+ Raises:
+ ValueError: If name is null or empty, or data_type is null.
+ """
+ if not name or not name.strip():
+ raise ValueError("Function column name cannot be null or empty")
+ self._name = name
+
+ if data_type is None:
+ raise ValueError("Function column type cannot be null")
+ self._data_type = data_type
+
+ self._comment = comment
+
+ @classmethod
+ def of(
+ cls, name: str, data_type: Type, comment: Optional[str] = None
+ ) -> "FunctionColumn":
+ """Create a FunctionColumn instance.
+
+ Args:
+ name: The column name.
+ data_type: The column type.
+ comment: The optional comment of the column.
+
+ Returns:
+ A FunctionColumn instance.
+ """
+ return cls(name, data_type, comment)
+
+ def name(self) -> str:
+ """Returns the column name."""
+ return self._name
+
+ def data_type(self) -> Type:
+ """Returns the column type."""
+ return self._data_type
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional column comment, None if not provided."""
+ return self._comment
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionColumn):
+ return False
+ return (
+ self._name == other._name
+ and self._data_type == other._data_type
+ and self._comment == other._comment
+ )
+
+ def __hash__(self) -> int:
+ return hash((self._name, self._data_type, self._comment))
+
+ def __repr__(self) -> str:
+ return (
+ f"FunctionColumn(name='{self._name}', dataType={self._data_type}, "
+ f"comment='{self._comment}')"
+ )
diff --git
a/clients/client-python/gravitino/api/function/function_definition.py
b/clients/client-python/gravitino/api/function/function_definition.py
new file mode 100644
index 0000000000..ef25ffadad
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_definition.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from typing import List, Optional
+
+from gravitino.api.function.function_column import FunctionColumn
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_param import FunctionParam
+from gravitino.api.rel.types.type import Type
+
+
+class FunctionDefinition(ABC):
+ """A function definition that pairs a specific parameter list with its
implementations.
+
+ A single function can include multiple definitions (overloads), each with
distinct
+ parameters, return types, and implementations.
+
+ For scalar or aggregate functions, use return_type() to specify the return
type.
+ For table-valued functions, use return_columns() to specify the output
columns.
+ """
+
+ EMPTY_COLUMNS: List[FunctionColumn] = []
+ """An empty list of FunctionColumn."""
+
+ @abstractmethod
+ def parameters(self) -> List[FunctionParam]:
+ """Returns the parameters for this definition.
+
+ Returns:
+ The parameters for this definition. May be an empty list for a
no-arg definition.
+ """
+ pass
+
+ def return_type(self) -> Optional[Type]:
+ """The return type for scalar or aggregate function definitions.
+
+ Returns:
+ The return type, or None if this is a table-valued function
definition.
+ """
+ return None
+
+ def return_columns(self) -> List[FunctionColumn]:
+ """The output columns for a table-valued function definition.
+
+ A table-valued function is a function that returns a table instead of a
+ scalar value or an aggregate result. The returned table has a fixed
schema
+ defined by the columns returned from this method.
+
+ Returns:
+ The output columns that define the schema of the table returned by
+ this definition, or an empty list if this is a scalar or aggregate
+ function definition.
+ """
+ return self.EMPTY_COLUMNS
+
+ @abstractmethod
+ def impls(self) -> List[FunctionImpl]:
+ """Returns the implementations associated with this definition."""
+ pass
+
+
+class FunctionDefinitions:
+ """Factory class for creating FunctionDefinition instances."""
+
+ class SimpleFunctionDefinition(FunctionDefinition):
+ """Simple implementation of FunctionDefinition."""
+
+ def __init__(
+ self,
+ parameters: List[FunctionParam],
+ return_type: Optional[Type],
+ return_columns: Optional[List[FunctionColumn]],
+ impls: List[FunctionImpl],
+ ):
+ self._parameters = list(parameters) if parameters else []
+ self._return_type = return_type
+ self._return_columns = (
+ list(return_columns)
+ if return_columns
+ else FunctionDefinition.EMPTY_COLUMNS
+ )
+ if not impls:
+ raise ValueError("Impls cannot be null or empty")
+ self._impls = list(impls)
+
+ def parameters(self) -> List[FunctionParam]:
+ return list(self._parameters)
+
+ def return_type(self) -> Optional[Type]:
+ return self._return_type
+
+ def return_columns(self) -> List[FunctionColumn]:
+ return (
+ list(self._return_columns)
+ if self._return_columns
+ else FunctionDefinition.EMPTY_COLUMNS
+ )
+
+ def impls(self) -> List[FunctionImpl]:
+ return list(self._impls)
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionDefinition):
+ return False
+ return (
+ self._parameters == list(other.parameters())
+ and self._return_type == other.return_type()
+ and self._return_columns == list(other.return_columns())
+ and self._impls == list(other.impls())
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ tuple(self._parameters),
+ self._return_type,
+ tuple(self._return_columns) if self._return_columns else
None,
+ tuple(self._impls),
+ )
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f"FunctionDefinition(parameters={self._parameters}, "
+ f"returnType={self._return_type}, "
+ f"returnColumns={self._return_columns}, "
+ f"impls={self._impls})"
+ )
+
+ @classmethod
+ def of(
+ cls,
+ parameters: List[FunctionParam],
+ return_type: Type,
+ impls: List[FunctionImpl],
+ ) -> FunctionDefinition:
+ """Create a FunctionDefinition instance for a scalar or aggregate
function.
+
+ Args:
+ parameters: The parameters for this definition, may be empty.
+ return_type: The return type for this definition, must not be None.
+ impls: The implementations for this definition, must not be empty.
+
+ Returns:
+ A FunctionDefinition instance.
+
+ Raises:
+ ValueError: If return_type is None.
+ """
+ if return_type is None:
+ raise ValueError("Return type cannot be None")
+ return cls.SimpleFunctionDefinition(parameters, return_type, None,
impls)
+
+ @classmethod
+ def of_table(
+ cls,
+ parameters: List[FunctionParam],
+ return_columns: List[FunctionColumn],
+ impls: List[FunctionImpl],
+ ) -> FunctionDefinition:
+ """Create a FunctionDefinition instance for a table-valued function.
+
+ Args:
+ parameters: The parameters for this definition, may be empty.
+ return_columns: The return columns for this definition, must not
be empty.
+ impls: The implementations for this definition, must not be empty.
+
+ Returns:
+ A FunctionDefinition instance.
+
+ Raises:
+ ValueError: If return_columns is None or empty.
+ """
+ if not return_columns:
+ raise ValueError("Return columns cannot be None or empty")
+ return cls.SimpleFunctionDefinition(parameters, None, return_columns,
impls)
diff --git a/clients/client-python/gravitino/api/function/function_impl.py
b/clients/client-python/gravitino/api/function/function_impl.py
new file mode 100644
index 0000000000..27d03a336b
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_impl.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC
+from enum import Enum
+from typing import Dict, Optional
+
+from gravitino.api.function.function_resources import FunctionResources
+
+
+class FunctionImpl(ABC):
+ """Base class of function implementations.
+
+ A function implementation must declare its language and optional external
resources.
+ Concrete implementations are provided by SQLImpl, JavaImpl, and PythonImpl.
+ """
+
+ class Language(Enum):
+ """Supported implementation languages."""
+
+ SQL = "SQL"
+ """SQL implementation."""
+
+ JAVA = "JAVA"
+ """Java implementation."""
+
+ PYTHON = "PYTHON"
+ """Python implementation."""
+
+ class RuntimeType(Enum):
+ """Supported execution runtimes for function implementations."""
+
+ SPARK = "SPARK"
+ """Spark runtime."""
+
+ TRINO = "TRINO"
+ """Trino runtime."""
+
+ @classmethod
+ def from_string(cls, value: str) -> "FunctionImpl.RuntimeType":
+ """Parse a runtime value from string.
+
+ Args:
+ value: Runtime name.
+
+ Returns:
+ Parsed runtime.
+
+ Raises:
+ ValueError: If the runtime is not supported.
+ """
+ if not value or not value.strip():
+ raise ValueError("Function runtime must be set")
+ normalized = value.strip().upper()
+ for runtime in cls:
+ if runtime.name == normalized:
+ return runtime
+ raise ValueError(f"Unsupported function runtime: {value}")
+
+ def __init__(
+ self,
+ language: Language,
+ runtime: RuntimeType,
+ resources: Optional[FunctionResources] = None,
+ properties: Optional[Dict[str, str]] = None,
+ ):
+ """Construct a FunctionImpl.
+
+ Args:
+ language: The language of the function implementation.
+ runtime: The runtime of the function implementation.
+ resources: The resources required by the function implementation.
+ properties: The properties of the function implementation.
+
+ Raises:
+ ValueError: If language or runtime is not set.
+ """
+ if language is None:
+ raise ValueError("Function implementation language must be set")
+ self._language = language
+
+ if runtime is None:
+ raise ValueError("Function runtime must be set")
+ self._runtime = runtime
+
+ self._resources = resources if resources else FunctionResources.empty()
+ self._properties = dict(properties) if properties else {}
+
+ def language(self) -> Language:
+ """Returns the implementation language."""
+ return self._language
+
+ def runtime(self) -> RuntimeType:
+ """Returns the target runtime."""
+ return self._runtime
+
+ def resources(self) -> FunctionResources:
+ """Returns the external resources required by this implementation."""
+ return self._resources
+
+ def properties(self) -> Dict[str, str]:
+ """Returns the additional properties of this implementation."""
+ return dict(self._properties)
diff --git a/clients/client-python/gravitino/api/function/function_param.py
b/clients/client-python/gravitino/api/function/function_param.py
new file mode 100644
index 0000000000..2c37e9a2ed
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_param.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from typing import Optional
+
+from gravitino.api.rel.expressions.expression import Expression
+from gravitino.api.rel.types.type import Type
+
+
+class FunctionParam(ABC):
+ """Represents a function parameter."""
+
+ @abstractmethod
+ def name(self) -> str:
+ """Returns the name of the parameter."""
+ pass
+
+ @abstractmethod
+ def data_type(self) -> Type:
+ """Returns the data type of the parameter."""
+ pass
+
+ def comment(self) -> Optional[str]:
+ """Returns the optional comment of the parameter, None if not
provided."""
+ return None
+
+ def default_value(self) -> Optional[Expression]:
+ """Returns the default value of the parameter if provided, otherwise
None."""
+ return None
+
+
+class FunctionParams:
+ """Factory class for creating FunctionParam instances."""
+
+ class SimpleFunctionParam(FunctionParam):
+ """Simple implementation of FunctionParam."""
+
+ def __init__(
+ self,
+ name: str,
+ data_type: Type,
+ comment: Optional[str] = None,
+ default_value: Optional[Expression] = None,
+ ):
+ if not name or not name.strip():
+ raise ValueError("Parameter name cannot be null or empty")
+ self._name = name
+
+ if data_type is None:
+ raise ValueError("Parameter data type cannot be null")
+ self._data_type = data_type
+
+ self._comment = comment
+ self._default_value = default_value
+
+ def name(self) -> str:
+ return self._name
+
+ def data_type(self) -> Type:
+ return self._data_type
+
+ def comment(self) -> Optional[str]:
+ return self._comment
+
+ def default_value(self) -> Optional[Expression]:
+ return self._default_value
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionParam):
+ return False
+ return (
+ self._name == other.name()
+ and self._data_type == other.data_type()
+ and self._comment == other.comment()
+ and self._default_value == other.default_value()
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (self._name, self._data_type, self._comment,
self._default_value)
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f"FunctionParam(name='{self._name}',
dataType={self._data_type}, "
+ f"comment='{self._comment}',
defaultValue={self._default_value})"
+ )
+
+ @classmethod
+ def of(
+ cls,
+ name: str,
+ data_type: Type,
+ comment: Optional[str] = None,
+ default_value: Optional[Expression] = None,
+ ) -> FunctionParam:
+ """Create a FunctionParam instance.
+
+ Args:
+ name: The parameter name.
+ data_type: The parameter type.
+ comment: The optional comment.
+ default_value: The optional default value.
+
+ Returns:
+ A FunctionParam instance.
+ """
+ return cls.SimpleFunctionParam(name, data_type, comment, default_value)
diff --git a/clients/client-python/gravitino/api/function/function_resources.py
b/clients/client-python/gravitino/api/function/function_resources.py
new file mode 100644
index 0000000000..1887f51fb4
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_resources.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List, Optional
+
+
+class FunctionResources:
+ """Represents external resources that are required by a function
implementation."""
+
+ def __init__(
+ self,
+ jars: Optional[List[str]] = None,
+ files: Optional[List[str]] = None,
+ archives: Optional[List[str]] = None,
+ ):
+ """Create a FunctionResources instance.
+
+ Args:
+ jars: The jar resources.
+ files: The file resources.
+ archives: The archive resources.
+ """
+ self._jars = list(jars) if jars else []
+ self._files = list(files) if files else []
+ self._archives = list(archives) if archives else []
+
+ @classmethod
+ def empty(cls) -> "FunctionResources":
+ """Returns an empty FunctionResources instance."""
+ return cls()
+
+ @classmethod
+ def of(
+ cls,
+ jars: Optional[List[str]] = None,
+ files: Optional[List[str]] = None,
+ archives: Optional[List[str]] = None,
+ ) -> "FunctionResources":
+ """Create a FunctionResources instance.
+
+ Args:
+ jars: The jar resources.
+ files: The file resources.
+ archives: The archive resources.
+
+ Returns:
+ A FunctionResources instance.
+ """
+ return cls(jars, files, archives)
+
+ def jars(self) -> List[str]:
+ """Returns the jar resources."""
+ return list(self._jars)
+
+ def files(self) -> List[str]:
+ """Returns the file resources."""
+ return list(self._files)
+
+ def archives(self) -> List[str]:
+ """Returns the archive resources."""
+ return list(self._archives)
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, FunctionResources):
+ return False
+ return (
+ self._jars == other._jars
+ and self._files == other._files
+ and self._archives == other._archives
+ )
+
+ def __hash__(self) -> int:
+ return hash((tuple(self._jars), tuple(self._files),
tuple(self._archives)))
+
+ def __repr__(self) -> str:
+ return (
+ f"FunctionResources(jars={self._jars}, "
+ f"files={self._files}, archives={self._archives})"
+ )
diff --git a/clients/client-python/gravitino/api/function/function_type.py
b/clients/client-python/gravitino/api/function/function_type.py
new file mode 100644
index 0000000000..2c0aac9b8f
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/function_type.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from enum import Enum
+
+
+class FunctionType(Enum):
+ """Function type supported by Gravitino."""
+
+ SCALAR = "scalar"
+ """Scalar function that returns a single value per row."""
+
+ AGGREGATE = "aggregate"
+ """Aggregate function that combines multiple rows into a single value."""
+
+ TABLE = "table"
+ """Table-valued function that returns a table of rows."""
+
+ @classmethod
+ def from_string(cls, type_str: str) -> "FunctionType":
+ """Parse the function type from a string value.
+
+ Args:
+ type_str: The string to parse.
+
+ Returns:
+ The parsed FunctionType.
+
+ Raises:
+ ValueError: If the value cannot be parsed.
+ """
+ if not type_str:
+ raise ValueError("Function type cannot be null or empty")
+
+ type_lower = type_str.lower()
+ if type_lower == "agg":
+ return cls.AGGREGATE
+
+ for func_type in cls:
+ if func_type.value == type_lower:
+ return func_type
+
+ raise ValueError(f"Invalid function type: {type_str}")
+
+ def type_name(self) -> str:
+ """Returns the canonical string representation used by APIs."""
+ return self.value
diff --git a/clients/client-python/gravitino/api/function/java_impl.py
b/clients/client-python/gravitino/api/function/java_impl.py
new file mode 100644
index 0000000000..7fabc88395
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/java_impl.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_resources import FunctionResources
+from gravitino.utils.precondition import Precondition
+
+
+class JavaImpl(FunctionImpl):
+ """Java implementation with class name."""
+
+ def __init__(
+ self,
+ runtime: FunctionImpl.RuntimeType,
+ class_name: str,
+ resources: Optional[FunctionResources] = None,
+ properties: Optional[Dict[str, str]] = None,
+ ):
+ """Create a JavaImpl instance.
+
+ Args:
+ runtime: The runtime of the function implementation.
+ class_name: The fully qualified class name.
+ resources: The resources required by the function implementation.
+ properties: The properties of the function implementation.
+
+ Raises:
+ ValueError: If class_name is null or empty.
+ """
+ super().__init__(FunctionImpl.Language.JAVA, runtime, resources,
properties)
+ if not class_name or not class_name.strip():
+ raise ValueError("Java class name cannot be null or empty")
+ self._class_name = class_name
+
+ def class_name(self) -> str:
+ """Returns the fully qualified class name."""
+ return self._class_name
+
+ @staticmethod
+ def builder() -> "JavaImpl.Builder":
+ """Returns a new Builder for JavaImpl."""
+ return JavaImpl.Builder()
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, JavaImpl):
+ return False
+ return (
+ self.language() == other.language()
+ and self.runtime() == other.runtime()
+ and self.resources() == other.resources()
+ and self.properties() == other.properties()
+ and self._class_name == other._class_name
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self.language(),
+ self.runtime(),
+ self.resources(),
+ tuple(sorted(self.properties().items())),
+ self._class_name,
+ )
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f"JavaImpl(language={self.language()}, runtime={self.runtime()}, "
+ f"className='{self._class_name}', resources={self.resources()}, "
+ f"properties={self.properties()})"
+ )
+
+ class Builder:
+ """Builder for JavaImpl."""
+
+ def __init__(self):
+ self._runtime: Optional[FunctionImpl.RuntimeType] = None
+ self._class_name: Optional[str] = None
+ self._resources: Optional[FunctionResources] = None
+ self._properties: Optional[Dict[str, str]] = None
+
+ def with_runtime_type(
+ self, runtime: FunctionImpl.RuntimeType
+ ) -> "JavaImpl.Builder":
+ """Sets the runtime type.
+
+ Args:
+ runtime: The runtime of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._runtime = runtime
+ return self
+
+ def with_class_name(self, class_name: str) -> "JavaImpl.Builder":
+ """Sets the class name.
+
+ Args:
+ class_name: The fully qualified class name.
+
+ Returns:
+ The builder instance.
+ """
+ self._class_name = class_name
+ return self
+
+ def with_resources(self, resources: FunctionResources) ->
"JavaImpl.Builder":
+ """Sets the resources.
+
+ Args:
+ resources: The resources required by the function
implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._resources = resources
+ return self
+
+ def with_properties(self, properties: Dict[str, str]) ->
"JavaImpl.Builder":
+ """Sets the properties.
+
+ Args:
+ properties: The properties of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._properties = properties
+ return self
+
+ def build(self) -> "JavaImpl":
+ """Builds a JavaImpl instance.
+
+ Returns:
+ A new JavaImpl instance.
+
+ Raises:
+ IllegalArgumentException: If required fields are not set.
+ """
+ Precondition.check_argument(
+ self._runtime is not None, "Runtime type cannot be null"
+ )
+ Precondition.check_argument(
+ self._class_name is not None, "Class name cannot be null"
+ )
+
+ return JavaImpl(
+ runtime=self._runtime,
+ class_name=self._class_name,
+ resources=self._resources,
+ properties=self._properties,
+ )
diff --git a/clients/client-python/gravitino/api/function/python_impl.py
b/clients/client-python/gravitino/api/function/python_impl.py
new file mode 100644
index 0000000000..f8e3358dbc
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/python_impl.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_resources import FunctionResources
+from gravitino.utils.precondition import Precondition
+
+
+class PythonImpl(FunctionImpl):
+ """Python implementation with handler and optional inline code."""
+
+ def __init__(
+ self,
+ runtime: FunctionImpl.RuntimeType,
+ handler: str,
+ code_block: Optional[str] = None,
+ resources: Optional[FunctionResources] = None,
+ properties: Optional[Dict[str, str]] = None,
+ ):
+ """Create a PythonImpl instance.
+
+ Args:
+ runtime: The runtime of the function implementation.
+ handler: The handler entrypoint.
+ code_block: The Python UDF code block.
+ resources: The resources required by the function implementation.
+ properties: The properties of the function implementation.
+
+ Raises:
+ ValueError: If handler is null or empty.
+ """
+ super().__init__(FunctionImpl.Language.PYTHON, runtime, resources,
properties)
+ if not handler or not handler.strip():
+ raise ValueError("Python handler cannot be null or empty")
+ self._handler = handler
+ self._code_block = code_block
+
+ def handler(self) -> str:
+ """Returns the handler entrypoint."""
+ return self._handler
+
+ def code_block(self) -> Optional[str]:
+ """Returns the Python UDF code block."""
+ return self._code_block
+
+ @staticmethod
+ def builder() -> "PythonImpl.Builder":
+ """Returns a new Builder for PythonImpl."""
+ return PythonImpl.Builder()
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, PythonImpl):
+ return False
+ return (
+ self.language() == other.language()
+ and self.runtime() == other.runtime()
+ and self.resources() == other.resources()
+ and self.properties() == other.properties()
+ and self._handler == other._handler
+ and self._code_block == other._code_block
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self.language(),
+ self.runtime(),
+ self.resources(),
+ tuple(sorted(self.properties().items())),
+ self._handler,
+ self._code_block,
+ )
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f"PythonImpl(language={self.language()}, runtime={self.runtime()},
"
+ f"handler='{self._handler}', codeBlock='{self._code_block}', "
+ f"resources={self.resources()}, properties={self.properties()})"
+ )
+
+ class Builder:
+ """Builder for PythonImpl."""
+
+ def __init__(self):
+ self._runtime: Optional[FunctionImpl.RuntimeType] = None
+ self._handler: Optional[str] = None
+ self._code_block: Optional[str] = None
+ self._resources: Optional[FunctionResources] = None
+ self._properties: Optional[Dict[str, str]] = None
+
+ def with_runtime_type(
+ self, runtime: FunctionImpl.RuntimeType
+ ) -> "PythonImpl.Builder":
+ """Sets the runtime type.
+
+ Args:
+ runtime: The runtime of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._runtime = runtime
+ return self
+
+ def with_handler(self, handler: str) -> "PythonImpl.Builder":
+ """Sets the handler.
+
+ Args:
+ handler: The handler entrypoint.
+
+ Returns:
+ The builder instance.
+ """
+ self._handler = handler
+ return self
+
+ def with_code_block(self, code_block: str) -> "PythonImpl.Builder":
+ """Sets the code block.
+
+ Args:
+ code_block: The Python UDF code block.
+
+ Returns:
+ The builder instance.
+ """
+ self._code_block = code_block
+ return self
+
+ def with_resources(self, resources: FunctionResources) ->
"PythonImpl.Builder":
+ """Sets the resources.
+
+ Args:
+ resources: The resources required by the function
implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._resources = resources
+ return self
+
+ def with_properties(self, properties: Dict[str, str]) ->
"PythonImpl.Builder":
+ """Sets the properties.
+
+ Args:
+ properties: The properties of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._properties = properties
+ return self
+
+ def build(self) -> "PythonImpl":
+ """Builds a PythonImpl instance.
+
+ Returns:
+ A new PythonImpl instance.
+
+ Raises:
+ IllegalArgumentException: If required fields are not set.
+ """
+ Precondition.check_argument(
+ self._runtime is not None, "Runtime type cannot be null"
+ )
+ Precondition.check_argument(
+ self._handler is not None, "Handler cannot be null"
+ )
+
+ return PythonImpl(
+ runtime=self._runtime,
+ handler=self._handler,
+ code_block=self._code_block,
+ resources=self._resources,
+ properties=self._properties,
+ )
diff --git a/clients/client-python/gravitino/api/function/sql_impl.py
b/clients/client-python/gravitino/api/function/sql_impl.py
new file mode 100644
index 0000000000..860a77b011
--- /dev/null
+++ b/clients/client-python/gravitino/api/function/sql_impl.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional
+
+from gravitino.api.function.function_impl import FunctionImpl
+from gravitino.api.function.function_resources import FunctionResources
+from gravitino.utils.precondition import Precondition
+
+
+class SQLImpl(FunctionImpl):
+ """SQL implementation with runtime and SQL body."""
+
+ def __init__(
+ self,
+ runtime: FunctionImpl.RuntimeType,
+ sql: str,
+ resources: Optional[FunctionResources] = None,
+ properties: Optional[Dict[str, str]] = None,
+ ):
+ """Create a SQLImpl instance.
+
+ Args:
+ runtime: The runtime of the function implementation.
+ sql: The SQL that defines the function.
+ resources: The resources required by the function implementation.
+ properties: The properties of the function implementation.
+
+ Raises:
+ ValueError: If sql is null or empty.
+ """
+ super().__init__(FunctionImpl.Language.SQL, runtime, resources,
properties)
+ if not sql or not sql.strip():
+ raise ValueError("SQL text cannot be null or empty")
+ self._sql = sql
+
+ def sql(self) -> str:
+ """Returns the SQL that defines the function."""
+ return self._sql
+
+ @staticmethod
+ def builder() -> "SQLImpl.Builder":
+ """Returns a new Builder for SQLImpl."""
+ return SQLImpl.Builder()
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, SQLImpl):
+ return False
+ return (
+ self.language() == other.language()
+ and self.runtime() == other.runtime()
+ and self.resources() == other.resources()
+ and self.properties() == other.properties()
+ and self._sql == other._sql
+ )
+
+ def __hash__(self) -> int:
+ return hash(
+ (
+ self.language(),
+ self.runtime(),
+ self.resources(),
+ tuple(sorted(self.properties().items())),
+ self._sql,
+ )
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f"SQLImpl(language={self.language()}, runtime={self.runtime()}, "
+ f"sql='{self._sql}', resources={self.resources()}, "
+ f"properties={self.properties()})"
+ )
+
+ class Builder:
+ """Builder for SQLImpl."""
+
+ def __init__(self):
+ self._runtime: Optional[FunctionImpl.RuntimeType] = None
+ self._sql: Optional[str] = None
+ self._resources: Optional[FunctionResources] = None
+ self._properties: Optional[Dict[str, str]] = None
+
+ def with_runtime_type(
+ self, runtime: FunctionImpl.RuntimeType
+ ) -> "SQLImpl.Builder":
+ """Sets the runtime type.
+
+ Args:
+ runtime: The runtime of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._runtime = runtime
+ return self
+
+ def with_sql(self, sql: str) -> "SQLImpl.Builder":
+ """Sets the SQL text.
+
+ Args:
+ sql: The SQL that defines the function.
+
+ Returns:
+ The builder instance.
+ """
+ self._sql = sql
+ return self
+
+ def with_resources(self, resources: FunctionResources) ->
"SQLImpl.Builder":
+ """Sets the resources.
+
+ Args:
+ resources: The resources required by the function
implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._resources = resources
+ return self
+
+ def with_properties(self, properties: Dict[str, str]) ->
"SQLImpl.Builder":
+ """Sets the properties.
+
+ Args:
+ properties: The properties of the function implementation.
+
+ Returns:
+ The builder instance.
+ """
+ self._properties = properties
+ return self
+
+ def build(self) -> "SQLImpl":
+ """Builds a SQLImpl instance.
+
+ Returns:
+ A new SQLImpl instance.
+
+ Raises:
+ IllegalArgumentException: If required fields are not set.
+ """
+ Precondition.check_argument(
+ self._runtime is not None, "Runtime type cannot be null"
+ )
+ Precondition.check_argument(self._sql is not None, "SQL cannot be
null")
+
+ return SQLImpl(
+ runtime=self._runtime,
+ sql=self._sql,
+ resources=self._resources,
+ properties=self._properties,
+ )
diff --git a/clients/client-python/gravitino/exceptions/base.py
b/clients/client-python/gravitino/exceptions/base.py
index a0fd09012d..6a60eb1c6a 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -219,3 +219,11 @@ class
PartitionAlreadyExistsException(AlreadyExistsException):
class TableAlreadyExistsException(AlreadyExistsException):
"""An exception thrown when a table already exists."""
+
+
+class NoSuchFunctionException(NotFoundException):
+ """An exception thrown when a function with specified name is not found."""
+
+
+class FunctionAlreadyExistsException(AlreadyExistsException):
+ """An exception thrown when a function already exists."""