This is an automated email from the ASF dual-hosted git repository. mchades pushed a commit to branch udf-poc in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 59d7d6374eaa1db44f99265a0b6dcbd76c5b94c0 Author: mchades <[email protected]> AuthorDate: Wed Dec 17 20:22:28 2025 +0800 add FileBackedFunctionDispatcher --- .../java/org/apache/gravitino/GravitinoEnv.java | 5 +- ...cher.java => FileBackedFunctionDispatcher.java} | 202 +++++++++++++++++++-- 2 files changed, 192 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index 8928482227..8861177f01 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -31,11 +31,11 @@ import org.apache.gravitino.auxiliary.AuxiliaryServiceManager; import org.apache.gravitino.catalog.CatalogDispatcher; import org.apache.gravitino.catalog.CatalogManager; import org.apache.gravitino.catalog.CatalogNormalizeDispatcher; +import org.apache.gravitino.catalog.FileBackedFunctionDispatcher; import org.apache.gravitino.catalog.FilesetDispatcher; import org.apache.gravitino.catalog.FilesetNormalizeDispatcher; import org.apache.gravitino.catalog.FilesetOperationDispatcher; import org.apache.gravitino.catalog.FunctionDispatcher; -import org.apache.gravitino.catalog.InMemoryFunctionDispatcher; import org.apache.gravitino.catalog.ModelDispatcher; import org.apache.gravitino.catalog.ModelNormalizeDispatcher; import org.apache.gravitino.catalog.ModelOperationDispatcher; @@ -561,8 +561,7 @@ public class GravitinoEnv { TableNormalizeDispatcher tableNormalizeDispatcher = new TableNormalizeDispatcher(tableHookDispatcher, catalogManager); this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher); - // TODO: Wire actual function dispatcher implementation once function management is available. - this.functionDispatcher = new InMemoryFunctionDispatcher(); + this.functionDispatcher = new FileBackedFunctionDispatcher(); // TODO: We can install hooks when we need, we only supports ownership post hook, // partition doesn't have ownership, so we don't need it now. diff --git a/core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java similarity index 60% rename from core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java rename to core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java index db91514958..34d1ca4012 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/InMemoryFunctionDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/FileBackedFunctionDispatcher.java @@ -18,16 +18,33 @@ */ package org.apache.gravitino.catalog; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.AtomicMoveNotSupportedException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Audit; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.dto.function.FunctionDTO; import org.apache.gravitino.exceptions.FunctionAlreadyExistsException; import org.apache.gravitino.exceptions.NoSuchFunctionException; import org.apache.gravitino.exceptions.NoSuchFunctionVersionException; @@ -39,17 +56,40 @@ import org.apache.gravitino.function.FunctionImpl; import org.apache.gravitino.function.FunctionParam; import org.apache.gravitino.function.FunctionSignature; import org.apache.gravitino.function.FunctionType; +import org.apache.gravitino.json.JsonUtils; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.utils.PrincipalUtils; /** - * An in-memory {@link FunctionDispatcher} that stores function metadata in memory. This is a - * temporary implementation intended for wiring REST paths before a persistent backend is added. + * A file-backed {@link FunctionDispatcher} that caches function metadata in memory and persists it + * under {@code ${GRAVITINO_HOME}/data/functions}. */ -public class InMemoryFunctionDispatcher implements FunctionDispatcher { +public class FileBackedFunctionDispatcher implements FunctionDispatcher { + + private static final String FUNCTIONS_DATA_DIR = "data"; + private static final String FUNCTIONS_DIR_NAME = "functions"; + private static final String FUNCTIONS_FILE_NAME = "functions.json"; private final Map<NameIdentifier, Map<FunctionSignature, NavigableMap<Integer, StoredFunction>>> functions = new HashMap<>(); + private final Path storageFile; + + public FileBackedFunctionDispatcher() { + Path storageDir = resolveStorageDir(); + if (Files.exists(storageDir) && !Files.isDirectory(storageDir)) { + throw new IllegalStateException( + String.format("Functions storage path is not a directory: %s", storageDir)); + } + try { + Files.createDirectories(storageDir); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to create functions storage directory: %s", storageDir), e); + } + this.storageFile = storageDir.resolve(FUNCTIONS_FILE_NAME); + loadFromDisk(); + } @Override public synchronized NameIdentifier[] listFunctions(Namespace namespace) @@ -59,12 +99,21 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { .toArray(NameIdentifier[]::new); } + @Override + public synchronized Function[] listFunctionInfos(Namespace namespace) + throws NoSuchSchemaException { // NO SONAR: kept for interface compatibility + return functions.entrySet().stream() + .filter(entry -> entry.getKey().namespace().equals(namespace)) + .flatMap(entry -> Arrays.stream(latestFunctions(entry.getValue().values()))) + .toArray(Function[]::new); + } + @Override public synchronized Function[] getFunction(NameIdentifier ident) throws NoSuchFunctionException { Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature = functions.get(ident); if (bySignature == null || bySignature.isEmpty()) { - throw new NoSuchFunctionException("Function not found: " + ident); + throw new NoSuchFunctionException("Function not found: %s", ident); } return latestFunctions(bySignature.values()); } @@ -75,7 +124,7 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature = functions.get(ident); if (bySignature == null || bySignature.isEmpty()) { - throw new NoSuchFunctionException("Function not found: " + ident); + throw new NoSuchFunctionException("Function not found: %s", ident); } Function[] matched = @@ -85,7 +134,7 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { .toArray(Function[]::new); if (matched.length == 0) { throw new NoSuchFunctionVersionException( - "Function version " + version + " not found for " + ident); + "Function version %s not found for %s", version, ident); } return matched; } @@ -143,7 +192,7 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature = functions.get(ident); if (bySignature == null || bySignature.isEmpty()) { - throw new NoSuchFunctionException("Function not found: " + ident); + throw new NoSuchFunctionException("Function not found: %s", ident); } FunctionSignature targetSignature = resolveTargetSignature(bySignature, changes); @@ -169,14 +218,28 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { } StoredFunction updated = - latest.newVersion(latest.version() + 1, comment, impls, latest.auditInfo()); + latest.newVersion( + latest.version() + 1, + comment, + impls, + AuditInfo.builder() + .withCreator(latest.auditInfo().creator()) + .withCreateTime(latest.auditInfo().createTime()) + .withLastModifier(PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()); versions.put(updated.version(), updated); + persistToDisk(); return updated; } @Override public synchronized boolean deleteFunction(NameIdentifier ident) { - return functions.remove(ident) != null; + boolean removed = functions.remove(ident) != null; + if (removed) { + persistToDisk(); + } + return removed; } @Override @@ -190,6 +253,9 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { if (bySignature.isEmpty()) { functions.remove(ident); } + if (removed) { + persistToDisk(); + } return removed; } @@ -207,7 +273,7 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { functions.computeIfAbsent(ident, ignored -> new HashMap<>()); if (bySignature.containsKey(signature)) { throw new FunctionAlreadyExistsException( - "Function already exists with signature: " + signature); + "Function already exists with signature: %s", signature); } StoredFunction stored = @@ -220,10 +286,14 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { returnColumns, functionImpls, 1, - AuditInfo.EMPTY); + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()); NavigableMap<Integer, StoredFunction> versions = new TreeMap<>(); versions.put(stored.version(), stored); bySignature.put(signature, versions); + persistToDisk(); return stored; } @@ -251,7 +321,7 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { } if (!bySignature.containsKey(target)) { - throw new NoSuchFunctionException("Function not found for signature: " + target); + throw new NoSuchFunctionException("Function not found for signature: %s", target); } return target; @@ -265,6 +335,114 @@ public class InMemoryFunctionDispatcher implements FunctionDispatcher { .toArray(Function[]::new); } + private Path resolveStorageDir() { + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + boolean isTest = Boolean.parseBoolean(System.getenv("GRAVITINO_TEST")); + if (isTest) { + String itProjectDir = System.getenv("IT_PROJECT_DIR"); + if (StringUtils.isNotBlank(itProjectDir)) { + return Paths.get(itProjectDir, FUNCTIONS_DATA_DIR, FUNCTIONS_DIR_NAME); + } + return Paths.get( + System.getProperty("java.io.tmpdir"), + "gravitino-test", + FUNCTIONS_DATA_DIR, + FUNCTIONS_DIR_NAME); + } + Preconditions.checkArgument(StringUtils.isNotBlank(gravitinoHome), "GRAVITINO_HOME not set"); + return Paths.get(gravitinoHome, FUNCTIONS_DATA_DIR, FUNCTIONS_DIR_NAME); + } + + private void loadFromDisk() { + if (!Files.exists(storageFile)) { + return; + } + try (Reader reader = Files.newBufferedReader(storageFile, StandardCharsets.UTF_8)) { + PersistedFunctionRecord[] records = + JsonUtils.objectMapper().readValue(reader, PersistedFunctionRecord[].class); + functions.clear(); + for (PersistedFunctionRecord record : records) { + if (record == null || record.function == null || record.identifier == null) { + continue; + } + NameIdentifier identifier = NameIdentifier.parse(record.identifier); + StoredFunction stored = toStoredFunction(record.function); + Map<FunctionSignature, NavigableMap<Integer, StoredFunction>> bySignature = + functions.computeIfAbsent(identifier, ignored -> new HashMap<>()); + NavigableMap<Integer, StoredFunction> versions = + bySignature.computeIfAbsent(stored.signature(), ignored -> new TreeMap<>()); + versions.put(stored.version(), stored); + } + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to load function metadata from %s", storageFile), e); + } + } + + private void persistToDisk() { + List<PersistedFunctionRecord> records = new ArrayList<>(); + for (Map.Entry<NameIdentifier, Map<FunctionSignature, NavigableMap<Integer, StoredFunction>>> + entry : functions.entrySet()) { + String identifier = entry.getKey().toString(); + for (NavigableMap<Integer, StoredFunction> versions : entry.getValue().values()) { + for (StoredFunction function : versions.values()) { + records.add(new PersistedFunctionRecord(identifier, FunctionDTO.fromFunction(function))); + } + } + } + try { + Files.createDirectories(storageFile.getParent()); + Path tempFile = + Files.createTempFile(storageFile.getParent(), FUNCTIONS_DIR_NAME, ".json.tmp"); + try (Writer writer = + Files.newBufferedWriter( + tempFile, + StandardCharsets.UTF_8, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE)) { + JsonUtils.objectMapper().writeValue(writer, records); + } + try { + Files.move( + tempFile, + storageFile, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } catch (AtomicMoveNotSupportedException e) { + Files.move(tempFile, storageFile, StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to persist function metadata to %s", storageFile), e); + } + } + + private StoredFunction toStoredFunction(Function function) { + return new StoredFunction( + function.signature(), + function.functionType(), + function.deterministic(), + function.comment(), + function.returnType(), + function.returnColumns(), + function.impls(), + function.version(), + function.auditInfo()); + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + private static final class PersistedFunctionRecord { + private String identifier; + private FunctionDTO function; + + PersistedFunctionRecord() {} + + PersistedFunctionRecord(String identifier, FunctionDTO function) { + this.identifier = identifier; + this.function = function; + } + } + private static final class StoredFunction implements Function { private final FunctionSignature signature; private final FunctionType functionType;
