This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch udf-poc
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 59d7d6374eaa1db44f99265a0b6dcbd76c5b94c0
Author: mchades <[email protected]>
AuthorDate: Wed Dec 17 20:22:28 2025 +0800

    add FileBackedFunctionDispatcher
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |   5 +-
 ...cher.java => FileBackedFunctionDispatcher.java} | 202 +++++++++++++++++++--
 2 files changed, 192 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 8928482227..8861177f01 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -31,11 +31,11 @@ import 
org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.catalog.CatalogDispatcher;
 import org.apache.gravitino.catalog.CatalogManager;
 import org.apache.gravitino.catalog.CatalogNormalizeDispatcher;
+import org.apache.gravitino.catalog.FileBackedFunctionDispatcher;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.catalog.FilesetNormalizeDispatcher;
 import org.apache.gravitino.catalog.FilesetOperationDispatcher;
 import org.apache.gravitino.catalog.FunctionDispatcher;
-import org.apache.gravitino.catalog.InMemoryFunctionDispatcher;
 import org.apache.gravitino.catalog.ModelDispatcher;
 import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
 import org.apache.gravitino.catalog.ModelOperationDispatcher;
@@ -561,8 +561,7 @@ public class GravitinoEnv {
     TableNormalizeDispatcher tableNormalizeDispatcher =
         new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
     this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
-    // TODO: Wire actual function dispatcher implementation once function 
management is available.
-    this.functionDispatcher = new InMemoryFunctionDispatcher();
+    this.functionDispatcher = new FileBackedFunctionDispatcher();
 
     // TODO: We can install hooks when we need, we only supports ownership 
post hook,
     //  partition doesn't have ownership, so we don't need it now.
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java
similarity index 60%
rename from 
core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java
rename to 
core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java
index db91514958..34d1ca4012 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java
@@ -18,16 +18,33 @@
  */
 package org.apache.gravitino.catalog;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Audit;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.function.FunctionDTO;
 import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFunctionException;
 import org.apache.gravitino.exceptions.NoSuchFunctionVersionException;
@@ -39,17 +56,40 @@ import org.apache.gravitino.function.FunctionImpl;
 import org.apache.gravitino.function.FunctionParam;
 import org.apache.gravitino.function.FunctionSignature;
 import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.json.JsonUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.utils.PrincipalUtils;
 
 /**
- * An in-memory {@link FunctionDispatcher} that stores function metadata in 
memory. This is a
- * temporary implementation intended for wiring REST paths before a persistent 
backend is added.
+ * A file-backed {@link FunctionDispatcher} that caches function metadata in 
memory and persists it
+ * under {@code ${GRAVITINO_HOME}/data/functions}.
  */
-public class InMemoryFunctionDispatcher implements FunctionDispatcher {
+public class FileBackedFunctionDispatcher implements FunctionDispatcher {
+
+  private static final String FUNCTIONS_DATA_DIR = "data";
+  private static final String FUNCTIONS_DIR_NAME = "functions";
+  private static final String FUNCTIONS_FILE_NAME = "functions.json";
 
   private final Map<NameIdentifier, Map<FunctionSignature, 
NavigableMap<Integer, StoredFunction>>>
       functions = new HashMap<>();
+  private final Path storageFile;
+
+  public FileBackedFunctionDispatcher() {
+    Path storageDir = resolveStorageDir();
+    if (Files.exists(storageDir) && !Files.isDirectory(storageDir)) {
+      throw new IllegalStateException(
+          String.format("Functions storage path is not a directory: %s", 
storageDir));
+    }
+    try {
+      Files.createDirectories(storageDir);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to create functions storage directory: %s", 
storageDir), e);
+    }
+    this.storageFile = storageDir.resolve(FUNCTIONS_FILE_NAME);
+    loadFromDisk();
+  }
 
   @Override
   public synchronized NameIdentifier[] listFunctions(Namespace namespace)
@@ -59,12 +99,21 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
         .toArray(NameIdentifier[]::new);
   }
 
+  @Override
+  public synchronized Function[] listFunctionInfos(Namespace namespace)
+      throws NoSuchSchemaException { // NO SONAR: kept for interface 
compatibility
+    return functions.entrySet().stream()
+        .filter(entry -> entry.getKey().namespace().equals(namespace))
+        .flatMap(entry -> 
Arrays.stream(latestFunctions(entry.getValue().values())))
+        .toArray(Function[]::new);
+  }
+
   @Override
   public synchronized Function[] getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
     Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature =
         functions.get(ident);
     if (bySignature == null || bySignature.isEmpty()) {
-      throw new NoSuchFunctionException("Function not found: " + ident);
+      throw new NoSuchFunctionException("Function not found: %s", ident);
     }
     return latestFunctions(bySignature.values());
   }
@@ -75,7 +124,7 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
     Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature =
         functions.get(ident);
     if (bySignature == null || bySignature.isEmpty()) {
-      throw new NoSuchFunctionException("Function not found: " + ident);
+      throw new NoSuchFunctionException("Function not found: %s", ident);
     }
 
     Function[] matched =
@@ -85,7 +134,7 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
             .toArray(Function[]::new);
     if (matched.length == 0) {
       throw new NoSuchFunctionVersionException(
-          "Function version " + version + " not found for " + ident);
+          "Function version %s not found for %s", version, ident);
     }
     return matched;
   }
@@ -143,7 +192,7 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
     Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature =
         functions.get(ident);
     if (bySignature == null || bySignature.isEmpty()) {
-      throw new NoSuchFunctionException("Function not found: " + ident);
+      throw new NoSuchFunctionException("Function not found: %s", ident);
     }
 
     FunctionSignature targetSignature = resolveTargetSignature(bySignature, 
changes);
@@ -169,14 +218,28 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
     }
 
     StoredFunction updated =
-        latest.newVersion(latest.version() + 1, comment, impls, 
latest.auditInfo());
+        latest.newVersion(
+            latest.version() + 1,
+            comment,
+            impls,
+            AuditInfo.builder()
+                .withCreator(latest.auditInfo().creator())
+                .withCreateTime(latest.auditInfo().createTime())
+                
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                .withLastModifiedTime(Instant.now())
+                .build());
     versions.put(updated.version(), updated);
+    persistToDisk();
     return updated;
   }
 
   @Override
   public synchronized boolean deleteFunction(NameIdentifier ident) {
-    return functions.remove(ident) != null;
+    boolean removed = functions.remove(ident) != null;
+    if (removed) {
+      persistToDisk();
+    }
+    return removed;
   }
 
   @Override
@@ -190,6 +253,9 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
     if (bySignature.isEmpty()) {
       functions.remove(ident);
     }
+    if (removed) {
+      persistToDisk();
+    }
     return removed;
   }
 
@@ -207,7 +273,7 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
         functions.computeIfAbsent(ident, ignored -> new HashMap<>());
     if (bySignature.containsKey(signature)) {
       throw new FunctionAlreadyExistsException(
-          "Function already exists with signature: " + signature);
+          "Function already exists with signature: %s", signature);
     }
 
     StoredFunction stored =
@@ -220,10 +286,14 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
             returnColumns,
             functionImpls,
             1,
-            AuditInfo.EMPTY);
+            AuditInfo.builder()
+                .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                .withCreateTime(Instant.now())
+                .build());
     NavigableMap<Integer, StoredFunction> versions = new TreeMap<>();
     versions.put(stored.version(), stored);
     bySignature.put(signature, versions);
+    persistToDisk();
     return stored;
   }
 
@@ -251,7 +321,7 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
     }
 
     if (!bySignature.containsKey(target)) {
-      throw new NoSuchFunctionException("Function not found for signature: " + 
target);
+      throw new NoSuchFunctionException("Function not found for signature: 
%s", target);
     }
 
     return target;
@@ -265,6 +335,114 @@ public class InMemoryFunctionDispatcher implements 
FunctionDispatcher {
         .toArray(Function[]::new);
   }
 
+  private Path resolveStorageDir() {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    boolean isTest = Boolean.parseBoolean(System.getenv("GRAVITINO_TEST"));
+    if (isTest) {
+      String itProjectDir = System.getenv("IT_PROJECT_DIR");
+      if (StringUtils.isNotBlank(itProjectDir)) {
+        return Paths.get(itProjectDir, FUNCTIONS_DATA_DIR, FUNCTIONS_DIR_NAME);
+      }
+      return Paths.get(
+          System.getProperty("java.io.tmpdir"),
+          "gravitino-test",
+          FUNCTIONS_DATA_DIR,
+          FUNCTIONS_DIR_NAME);
+    }
+    Preconditions.checkArgument(StringUtils.isNotBlank(gravitinoHome), 
"GRAVITINO_HOME not set");
+    return Paths.get(gravitinoHome, FUNCTIONS_DATA_DIR, FUNCTIONS_DIR_NAME);
+  }
+
+  private void loadFromDisk() {
+    if (!Files.exists(storageFile)) {
+      return;
+    }
+    try (Reader reader = Files.newBufferedReader(storageFile, 
StandardCharsets.UTF_8)) {
+      PersistedFunctionRecord[] records =
+          JsonUtils.objectMapper().readValue(reader, 
PersistedFunctionRecord[].class);
+      functions.clear();
+      for (PersistedFunctionRecord record : records) {
+        if (record == null || record.function == null || record.identifier == 
null) {
+          continue;
+        }
+        NameIdentifier identifier = NameIdentifier.parse(record.identifier);
+        StoredFunction stored = toStoredFunction(record.function);
+        Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> 
bySignature =
+            functions.computeIfAbsent(identifier, ignored -> new HashMap<>());
+        NavigableMap<Integer, StoredFunction> versions =
+            bySignature.computeIfAbsent(stored.signature(), ignored -> new 
TreeMap<>());
+        versions.put(stored.version(), stored);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to load function metadata from %s", 
storageFile), e);
+    }
+  }
+
+  private void persistToDisk() {
+    List<PersistedFunctionRecord> records = new ArrayList<>();
+    for (Map.Entry<NameIdentifier, Map<FunctionSignature, 
NavigableMap<Integer, StoredFunction>>>
+        entry : functions.entrySet()) {
+      String identifier = entry.getKey().toString();
+      for (NavigableMap<Integer, StoredFunction> versions : 
entry.getValue().values()) {
+        for (StoredFunction function : versions.values()) {
+          records.add(new PersistedFunctionRecord(identifier, 
FunctionDTO.fromFunction(function)));
+        }
+      }
+    }
+    try {
+      Files.createDirectories(storageFile.getParent());
+      Path tempFile =
+          Files.createTempFile(storageFile.getParent(), FUNCTIONS_DIR_NAME, 
".json.tmp");
+      try (Writer writer =
+          Files.newBufferedWriter(
+              tempFile,
+              StandardCharsets.UTF_8,
+              StandardOpenOption.TRUNCATE_EXISTING,
+              StandardOpenOption.WRITE)) {
+        JsonUtils.objectMapper().writeValue(writer, records);
+      }
+      try {
+        Files.move(
+            tempFile,
+            storageFile,
+            StandardCopyOption.ATOMIC_MOVE,
+            StandardCopyOption.REPLACE_EXISTING);
+      } catch (AtomicMoveNotSupportedException e) {
+        Files.move(tempFile, storageFile, StandardCopyOption.REPLACE_EXISTING);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to persist function metadata to %s", 
storageFile), e);
+    }
+  }
+
+  private StoredFunction toStoredFunction(Function function) {
+    return new StoredFunction(
+        function.signature(),
+        function.functionType(),
+        function.deterministic(),
+        function.comment(),
+        function.returnType(),
+        function.returnColumns(),
+        function.impls(),
+        function.version(),
+        function.auditInfo());
+  }
+
+  @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+  private static final class PersistedFunctionRecord {
+    private String identifier;
+    private FunctionDTO function;
+
+    PersistedFunctionRecord() {}
+
+    PersistedFunctionRecord(String identifier, FunctionDTO function) {
+      this.identifier = identifier;
+      this.function = function;
+    }
+  }
+
   private static final class StoredFunction implements Function {
     private final FunctionSignature signature;
     private final FunctionType functionType;

Reply via email to