This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 46ae50ddbe [#9527] feat(core): Add UDF management framework (#9553)
46ae50ddbe is described below
commit 46ae50ddbebc263fba83acbe1f2b92548699ac7e
Author: mchades <[email protected]>
AuthorDate: Tue Jan 6 16:03:46 2026 +0800
[#9527] feat(core): Add UDF management framework (#9553)
### What changes were proposed in this pull request?
Add UDF management framework
### Why are the changes needed?
Fix: #9527
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
CI pass
---
.../java/org/apache/gravitino/GravitinoEnv.java | 25 ++
.../gravitino/catalog/CapabilityHelpers.java | 6 +-
.../gravitino/catalog/FunctionDispatcher.java | 29 +++
.../catalog/FunctionNormalizeDispatcher.java | 145 ++++++++++++
.../catalog/FunctionOperationDispatcher.java | 254 +++++++++++++++++++++
.../catalog/ManagedFunctionOperations.java | 137 +++++++++++
.../gravitino/connector/capability/Capability.java | 7 +-
7 files changed, 600 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index a4403c8126..a9980df00a 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -35,6 +35,9 @@ import
org.apache.gravitino.catalog.CatalogNormalizeDispatcher;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.catalog.FilesetNormalizeDispatcher;
import org.apache.gravitino.catalog.FilesetOperationDispatcher;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.catalog.FunctionNormalizeDispatcher;
+import org.apache.gravitino.catalog.FunctionOperationDispatcher;
import org.apache.gravitino.catalog.ModelDispatcher;
import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
import org.apache.gravitino.catalog.ModelOperationDispatcher;
@@ -130,6 +133,8 @@ public class GravitinoEnv {
private ModelDispatcher modelDispatcher;
+ private FunctionDispatcher functionDispatcher;
+
private MetalakeDispatcher metalakeDispatcher;
private CredentialOperationDispatcher credentialOperationDispatcher;
@@ -257,6 +262,15 @@ public class GravitinoEnv {
return modelDispatcher;
}
+ /**
+ * Get the FunctionDispatcher associated with the Gravitino environment.
+ *
+ * @return The FunctionDispatcher instance.
+ */
+ public FunctionDispatcher functionDispatcher() {
+ return functionDispatcher;
+ }
+
/**
* Get the PartitionDispatcher associated with the Gravitino environment.
*
@@ -623,6 +637,17 @@ public class GravitinoEnv {
ModelNormalizeDispatcher modelNormalizeDispatcher =
new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
this.modelDispatcher = new ModelEventDispatcher(eventBus,
modelNormalizeDispatcher);
+
+ // TODO: Add FunctionHookDispatcher and FunctionEventDispatcher when needed
+ // The operation chain should be:
+ // FunctionEventDispatcher -> FunctionNormalizeDispatcher ->
FunctionHookDispatcher ->
+ // FunctionOperationDispatcher
+ FunctionOperationDispatcher functionOperationDispatcher =
+ new FunctionOperationDispatcher(
+ catalogManager, schemaOperationDispatcher, entityStore,
idGenerator);
+ this.functionDispatcher =
+ new FunctionNormalizeDispatcher(functionOperationDispatcher,
catalogManager);
+
this.statisticDispatcher =
new StatisticEventDispatcher(
eventBus, new StatisticManager(entityStore, idGenerator, config));
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
index fe4659c28e..e405cfdd92 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
@@ -131,7 +131,8 @@ public class CapabilityHelpers {
if (identScope == Capability.Scope.TABLE
|| identScope == Capability.Scope.FILESET
|| identScope == Capability.Scope.TOPIC
- || identScope == Capability.Scope.MODEL) {
+ || identScope == Capability.Scope.MODEL
+ || identScope == Capability.Scope.FUNCTION) {
String schema = namespace.level(namespace.length() - 1);
schema = applyCaseSensitiveOnName(Capability.Scope.SCHEMA, schema,
capabilities);
return Namespace.of(metalake, catalog, schema);
@@ -201,7 +202,8 @@ public class CapabilityHelpers {
String catalog = namespace.level(1);
if (identScope == Capability.Scope.TABLE
|| identScope == Capability.Scope.FILESET
- || identScope == Capability.Scope.TOPIC) {
+ || identScope == Capability.Scope.TOPIC
+ || identScope == Capability.Scope.FUNCTION) {
String schema = namespace.level(namespace.length() - 1);
schema = applyCapabilitiesOnName(Capability.Scope.SCHEMA, schema,
capabilities);
return Namespace.of(metalake, catalog, schema);
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java
new file mode 100644
index 0000000000..65c23c920c
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/FunctionDispatcher.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import org.apache.gravitino.function.FunctionCatalog;
+
+/**
+ * {@code FunctionDispatcher} interface acts as a specialization of the {@link
FunctionCatalog}
+ * interface. This interface is designed to potentially add custom behaviors
or operations related
+ * to dispatching or handling function-related events or actions that are not
covered by the
+ * standard {@code FunctionCatalog} operations.
+ */
+public interface FunctionDispatcher extends FunctionCatalog {}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
new file mode 100644
index 0000000000..4930a780aa
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/catalog/FunctionNormalizeDispatcher.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
+import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Type;
+
+/**
+ * {@code FunctionNormalizeDispatcher} normalizes function identifiers and
namespaces by applying
+ * case-sensitivity and other naming capabilities before delegating to the
underlying dispatcher.
+ */
+public class FunctionNormalizeDispatcher implements FunctionDispatcher {
+ private final CatalogManager catalogManager;
+ private final FunctionDispatcher dispatcher;
+
+ public FunctionNormalizeDispatcher(FunctionDispatcher dispatcher,
CatalogManager catalogManager) {
+ this.dispatcher = dispatcher;
+ this.catalogManager = catalogManager;
+ }
+
+ @Override
+ public NameIdentifier[] listFunctions(Namespace namespace) throws
NoSuchSchemaException {
+ Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
+ NameIdentifier[] identifiers = dispatcher.listFunctions(caseSensitiveNs);
+ return normalizeCaseSensitive(identifiers);
+ }
+
+ @Override
+ public Function[] listFunctionInfos(Namespace namespace) throws
NoSuchSchemaException {
+ return dispatcher.listFunctionInfos(normalizeCaseSensitive(namespace));
+ }
+
+ @Override
+ public Function getFunction(NameIdentifier ident) throws
NoSuchFunctionException {
+ return dispatcher.getFunction(normalizeCaseSensitive(ident));
+ }
+
+ @Override
+ public Function getFunction(NameIdentifier ident, int version)
+ throws NoSuchFunctionException, NoSuchFunctionVersionException {
+ return dispatcher.getFunction(normalizeCaseSensitive(ident), version);
+ }
+
+ @Override
+ public boolean functionExists(NameIdentifier ident) {
+ return dispatcher.functionExists(normalizeCaseSensitive(ident));
+ }
+
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ FunctionType functionType,
+ boolean deterministic,
+ Type returnType,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ return dispatcher.registerFunction(
+ normalizeNameIdentifier(ident),
+ comment,
+ functionType,
+ deterministic,
+ returnType,
+ definitions);
+ }
+
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ boolean deterministic,
+ FunctionColumn[] returnColumns,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ return dispatcher.registerFunction(
+ normalizeNameIdentifier(ident), comment, deterministic, returnColumns,
definitions);
+ }
+
+ @Override
+ public Function alterFunction(NameIdentifier ident, FunctionChange...
changes)
+ throws NoSuchFunctionException, IllegalArgumentException {
+ return dispatcher.alterFunction(normalizeCaseSensitive(ident), changes);
+ }
+
+ @Override
+ public boolean dropFunction(NameIdentifier ident) {
+ return dispatcher.dropFunction(normalizeCaseSensitive(ident));
+ }
+
+ private Namespace normalizeCaseSensitive(Namespace namespace) {
+ Capability capabilities =
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+ return applyCaseSensitive(namespace, Capability.Scope.FUNCTION,
capabilities);
+ }
+
+ private NameIdentifier normalizeCaseSensitive(NameIdentifier functionIdent) {
+ Capability capabilities = getCapability(functionIdent, catalogManager);
+ return applyCaseSensitive(functionIdent, Capability.Scope.FUNCTION,
capabilities);
+ }
+
+ private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[]
functionIdents) {
+ if (ArrayUtils.isEmpty(functionIdents)) {
+ return functionIdents;
+ }
+
+ Capability capabilities = getCapability(functionIdents[0], catalogManager);
+ return applyCaseSensitive(functionIdents, Capability.Scope.FUNCTION,
capabilities);
+ }
+
+ private NameIdentifier normalizeNameIdentifier(NameIdentifier functionIdent)
{
+ Capability capability = getCapability(functionIdent, catalogManager);
+ return applyCapabilities(functionIdent, Capability.Scope.FUNCTION,
capability);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
new file mode 100644
index 0000000000..2392053f69
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/catalog/FunctionOperationDispatcher.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * {@code FunctionOperationDispatcher} is responsible for dispatching
function-related operations.
+ *
+ * <p>Unlike ModelCatalog which manages its own schemas (via
ManagedSchemaOperations), functions are
+ * registered under schemas managed by the underlying catalog (e.g., Hive,
Iceberg). Therefore, this
+ * dispatcher validates schema existence by calling the underlying catalog's
schema operations, then
+ * delegates actual storage operations to {@link ManagedFunctionOperations}.
+ */
+public class FunctionOperationDispatcher extends OperationDispatcher
implements FunctionDispatcher {
+
+ private final SchemaOperationDispatcher schemaOps;
+ private final ManagedFunctionOperations managedFunctionOps;
+
+ /**
+ * Creates a new FunctionOperationDispatcher instance.
+ *
+ * @param catalogManager The CatalogManager instance to be used for function
operations.
+ * @param store The EntityStore instance to be used for function operations.
+ * @param idGenerator The IdGenerator instance to be used for function
operations.
+ */
+ public FunctionOperationDispatcher(
+ CatalogManager catalogManager,
+ SchemaOperationDispatcher schemaOps,
+ EntityStore store,
+ IdGenerator idGenerator) {
+ super(catalogManager, store, idGenerator);
+ this.schemaOps = schemaOps;
+ this.managedFunctionOps = new ManagedFunctionOperations(store,
idGenerator);
+ }
+
+ /**
+ * List the functions in a namespace from the catalog.
+ *
+ * @param namespace A namespace.
+ * @return An array of function identifiers in the namespace.
+ * @throws NoSuchSchemaException If the schema does not exist.
+ */
+ @Override
+ public NameIdentifier[] listFunctions(Namespace namespace) throws
NoSuchSchemaException {
+ return Arrays.stream(listFunctionInfos(namespace))
+ .map(f -> NameIdentifier.of(namespace, f.name()))
+ .toArray(NameIdentifier[]::new);
+ }
+
+ @Override
+ public Function[] listFunctionInfos(Namespace namespace) throws
NoSuchSchemaException {
+ NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
+ // Validate schema exists in the underlying catalog
+ schemaOps.loadSchema(schemaIdent);
+
+ return TreeLockUtils.doWithTreeLock(
+ schemaIdent, LockType.READ, () ->
managedFunctionOps.listFunctionInfos(namespace));
+ }
+
+ /**
+ * Get a function by {@link NameIdentifier} from the catalog. Returns the
latest version.
+ *
+ * @param ident A function identifier.
+ * @return The latest version of the function with the given name.
+ * @throws NoSuchFunctionException If the function does not exist.
+ */
+ @Override
+ public Function getFunction(NameIdentifier ident) throws
NoSuchFunctionException {
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ // Validate schema exists in the underlying catalog
+ if (!schemaOps.schemaExists(schemaIdent)) {
+ throw new NoSuchFunctionException("Schema does not exist: %s",
schemaIdent);
+ }
+
+ return TreeLockUtils.doWithTreeLock(
+ ident, LockType.READ, () -> managedFunctionOps.getFunction(ident));
+ }
+
+ /**
+ * Get a function by {@link NameIdentifier} and version from the catalog.
+ *
+ * @param ident A function identifier.
+ * @param version The function version, counted from 0.
+ * @return The function with the given name and version.
+ * @throws NoSuchFunctionException If the function does not exist.
+ * @throws NoSuchFunctionVersionException If the function version does not
exist.
+ */
+ @Override
+ public Function getFunction(NameIdentifier ident, int version)
+ throws NoSuchFunctionException, NoSuchFunctionVersionException {
+ Preconditions.checkArgument(version >= 0, "Function version must be
non-negative");
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ // Validate schema exists in the underlying catalog
+ if (!schemaOps.schemaExists(schemaIdent)) {
+ throw new NoSuchFunctionException("Schema does not exist: %s",
schemaIdent);
+ }
+
+ return TreeLockUtils.doWithTreeLock(
+ ident, LockType.READ, () -> managedFunctionOps.getFunction(ident,
version));
+ }
+
+ /**
+ * Register a scalar or aggregate function with one or more definitions
(overloads).
+ *
+ * @param ident The function identifier.
+ * @param comment The optional function comment.
+ * @param functionType The function type.
+ * @param deterministic Whether the function is deterministic.
+ * @param returnType The return type.
+ * @param definitions The function definitions.
+ * @return The registered function.
+ * @throws NoSuchSchemaException If the schema does not exist.
+ * @throws FunctionAlreadyExistsException If the function already exists.
+ */
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ FunctionType functionType,
+ boolean deterministic,
+ Type returnType,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ Preconditions.checkArgument(
+ functionType == FunctionType.SCALAR || functionType ==
FunctionType.AGGREGATE,
+ "This method is for scalar or aggregate functions only");
+ Preconditions.checkArgument(returnType != null, "Return type is required");
+ Preconditions.checkArgument(
+ definitions != null && definitions.length > 0, "At least one
definition is required");
+
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ // Validate schema exists in the underlying catalog
+ schemaOps.loadSchema(schemaIdent);
+
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.WRITE,
+ () ->
+ managedFunctionOps.registerFunction(
+ ident, comment, functionType, deterministic, returnType,
definitions));
+ }
+
+ /**
+ * Register a table-valued function with one or more definitions (overloads).
+ *
+ * @param ident The function identifier.
+ * @param comment The optional function comment.
+ * @param deterministic Whether the function is deterministic.
+ * @param returnColumns The return columns.
+ * @param definitions The function definitions.
+ * @return The registered function.
+ * @throws NoSuchSchemaException If the schema does not exist.
+ * @throws FunctionAlreadyExistsException If the function already exists.
+ */
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ boolean deterministic,
+ FunctionColumn[] returnColumns,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ Preconditions.checkArgument(
+ returnColumns != null && returnColumns.length > 0,
+ "At least one return column is required for table-valued function");
+ Preconditions.checkArgument(
+ definitions != null && definitions.length > 0, "At least one
definition is required");
+
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ // Validate schema exists in the underlying catalog
+ schemaOps.loadSchema(schemaIdent);
+
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.WRITE,
+ () ->
+ managedFunctionOps.registerFunction(
+ ident, comment, deterministic, returnColumns, definitions));
+ }
+
+ /**
+ * Applies {@link FunctionChange changes} to a function in the catalog.
+ *
+ * @param ident the {@link NameIdentifier} instance of the function to alter.
+ * @param changes the several {@link FunctionChange} instances to apply to
the function.
+ * @return the updated {@link Function} instance.
+ * @throws NoSuchFunctionException If the function does not exist.
+ * @throws IllegalArgumentException If the change is rejected by the
implementation.
+ */
+ @Override
+ public Function alterFunction(NameIdentifier ident, FunctionChange...
changes)
+ throws NoSuchFunctionException, IllegalArgumentException {
+ Preconditions.checkArgument(
+ changes != null && changes.length > 0, "At least one change is
required");
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ if (!schemaOps.schemaExists(schemaIdent)) {
+ throw new NoSuchFunctionException("Schema does not exist: %s",
schemaIdent);
+ }
+
+ return TreeLockUtils.doWithTreeLock(
+ ident, LockType.WRITE, () -> managedFunctionOps.alterFunction(ident,
changes));
+ }
+
+ /**
+ * Drop a function by name.
+ *
+ * @param ident The name identifier of the function.
+ * @return True if the function is deleted, false if the function does not
exist.
+ */
+ @Override
+ public boolean dropFunction(NameIdentifier ident) {
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ if (!schemaOps.schemaExists(schemaIdent)) {
+ return false;
+ }
+
+ return TreeLockUtils.doWithTreeLock(
+ ident, LockType.WRITE, () -> managedFunctionOps.dropFunction(ident));
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
new file mode 100644
index 0000000000..f080f6ee8b
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/catalog/ManagedFunctionOperations.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionColumn;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.storage.IdGenerator;
+
+/**
+ * {@code ManagedFunctionOperations} provides the storage-level operations for
managing functions in
+ * Gravitino's EntityStore.
+ *
+ * <p>This class handles the actual persistence of function metadata,
including:
+ *
+ * <ul>
+ * <li>Storing function entities and their versions
+ * <li>Retrieving functions by identifier or version
+ * <li>Updating function metadata
+ * <li>Deleting functions and their versions
+ * </ul>
+ */
+public class ManagedFunctionOperations implements FunctionCatalog {
+
+ @SuppressWarnings("UnusedVariable")
+ private static final int INIT_VERSION = 0;
+
+ @SuppressWarnings("UnusedVariable")
+ private final EntityStore store;
+
+ @SuppressWarnings("UnusedVariable")
+ private final IdGenerator idGenerator;
+
+ /**
+ * Creates a new ManagedFunctionOperations instance.
+ *
+ * @param store The EntityStore instance for function persistence.
+ * @param idGenerator The IdGenerator instance for generating unique IDs.
+ */
+ public ManagedFunctionOperations(EntityStore store, IdGenerator idGenerator)
{
+ this.store = store;
+ this.idGenerator = idGenerator;
+ }
+
+ @Override
+ public NameIdentifier[] listFunctions(Namespace namespace) throws
NoSuchSchemaException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException("listFunctions: FunctionEntity not
yet implemented");
+ }
+
+ @Override
+ public Function[] listFunctionInfos(Namespace namespace) throws
NoSuchSchemaException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException(
+ "listFunctionInfos: FunctionEntity not yet implemented");
+ }
+
+ @Override
+ public Function getFunction(NameIdentifier ident) throws
NoSuchFunctionException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException("getFunction: FunctionEntity not
yet implemented");
+ }
+
+ @Override
+ public Function getFunction(NameIdentifier ident, int version)
+ throws NoSuchFunctionException, NoSuchFunctionVersionException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException(
+ "getFunction with version: FunctionEntity not yet implemented");
+ }
+
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ FunctionType functionType,
+ boolean deterministic,
+ Type returnType,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException("registerFunction: FunctionEntity
not yet implemented");
+ }
+
+ @Override
+ public Function registerFunction(
+ NameIdentifier ident,
+ String comment,
+ boolean deterministic,
+ FunctionColumn[] returnColumns,
+ FunctionDefinition[] definitions)
+ throws NoSuchSchemaException, FunctionAlreadyExistsException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException(
+ "registerFunction for table-valued functions: FunctionEntity not yet
implemented");
+ }
+
+ @Override
+ public Function alterFunction(NameIdentifier ident, FunctionChange...
changes)
+ throws NoSuchFunctionException, IllegalArgumentException {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException("alterFunction: FunctionEntity not
yet implemented");
+ }
+
+ @Override
+ public boolean dropFunction(NameIdentifier ident) {
+ // TODO: Implement when FunctionEntity is available
+ throw new UnsupportedOperationException("dropFunction: FunctionEntity not
yet implemented");
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
index 3b33b8f3e2..784946c59e 100644
---
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
+++
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
@@ -40,7 +40,8 @@ public interface Capability {
FILESET,
TOPIC,
PARTITION,
- MODEL
+ MODEL,
+ FUNCTION
}
/**
@@ -144,6 +145,10 @@ public interface Capability {
@Override
public CapabilityResult managedStorage(Scope scope) {
+ if (scope == Scope.FUNCTION) {
+ return CapabilityResult.SUPPORTED;
+ }
+
return CapabilityResult.unsupported(
String.format("The %s entity is not fully managed by Gravitino.",
scope));
}