This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch file-based-config-store
in repository https://gitbox.apache.org/repos/asf/tika.git

commit dab0ac754b6d07d361689d8082b0f0c484704f0d
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Sun Dec 28 10:27:06 2025 -0600

    Add file-based ConfigStore for shared config between gRPC and PipesServer
    
    - Created FileBasedConfigStore that persists to JSON file
    - Created FileBasedConfigStoreFactory with @Extension annotation
    - Updated PipesServer.initializeResources() to create and use ConfigStore
    - Both gRPC server and forked PipesServer can now share fetcher configs via 
file
    
    This enables dynamic fetcher management across JVM processes:
    1. gRPC saves fetcher → writes to config file
    2. PipesServer starts → reads from same file
    3. Both JVMs share the same fetcher configuration
---
 .../pipes/core/config/FileBasedConfigStore.java    | 165 +++++++++++++++++++++
 .../core/config/FileBasedConfigStoreFactory.java   |  60 ++++++++
 .../apache/tika/pipes/core/server/PipesServer.java |  30 +++-
 3 files changed, 251 insertions(+), 4 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
new file mode 100644
index 000000000..ee0de07fd
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * File-based implementation of {@link ConfigStore} that persists 
configurations to a JSON file.
+ * This allows multiple JVM processes to share configuration through the 
filesystem.
+ * Thread-safe and suitable for multi-process deployments where PipesClient 
forks PipesServer.
+ */
+public class FileBasedConfigStore implements ConfigStore {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedConfigStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    
+    private final Path configFile;
+    private final ConcurrentHashMap<String, ExtensionConfig> cache = new 
ConcurrentHashMap<>();
+    private ExtensionConfig extensionConfig;
+    
+    public FileBasedConfigStore(Path configFile) {
+        this.configFile = configFile;
+    }
+
+    @Override
+    public ExtensionConfig getExtensionConfig() {
+        return extensionConfig;
+    }
+
+    public void setExtensionConfig(ExtensionConfig extensionConfig) {
+        this.extensionConfig = extensionConfig;
+    }
+
+    @Override
+    public void init() throws Exception {
+        // Create parent directories if they don't exist
+        if (configFile.getParent() != null) {
+            Files.createDirectories(configFile.getParent());
+        }
+        
+        // Load existing configs if file exists
+        if (Files.exists(configFile)) {
+            loadFromFile();
+            LOG.info("Loaded {} configurations from {}", cache.size(), 
configFile);
+        } else {
+            LOG.info("Config file does not exist yet, will be created on first 
save: {}", configFile);
+        }
+    }
+
+    @Override
+    public synchronized void put(String id, ExtensionConfig config) {
+        cache.put(id, config);
+        saveToFile();
+    }
+
+    @Override
+    public ExtensionConfig get(String id) {
+        // Reload from file to get latest changes from other processes
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.get(id);
+    }
+
+    @Override
+    public boolean containsKey(String id) {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.containsKey(id);
+    }
+
+    @Override
+    public Set<String> keySet() {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return Set.copyOf(cache.keySet());
+    }
+
+    @Override
+    public int size() {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.size();
+    }
+
+    @Override
+    public synchronized ExtensionConfig remove(String id) {
+        ExtensionConfig removed = cache.remove(id);
+        if (removed != null) {
+            saveToFile();
+        }
+        return removed;
+    }
+
+    private synchronized void loadFromFile() throws IOException {
+        if (!Files.exists(configFile)) {
+            return;
+        }
+        
+        try {
+            Map<String, ExtensionConfig> loaded = OBJECT_MAPPER.readValue(
+                configFile.toFile(),
+                new TypeReference<Map<String, ExtensionConfig>>() {}
+            );
+            cache.clear();
+            cache.putAll(loaded);
+        } catch (IOException e) {
+            LOG.error("Failed to load configurations from {}", configFile, e);
+            throw e;
+        }
+    }
+
+    private synchronized void saveToFile() {
+        try {
+            // Write to temp file first, then atomic rename
+            Path tempFile = Paths.get(configFile.toString() + ".tmp");
+            OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+                .writeValue(tempFile.toFile(), cache);
+            Files.move(tempFile, configFile, 
StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
+            LOG.debug("Saved {} configurations to {}", cache.size(), 
configFile);
+        } catch (IOException e) {
+            LOG.error("Failed to save configurations to {}", configFile, e);
+            throw new RuntimeException("Failed to save config store", e);
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
new file mode 100644
index 000000000..59274b0c4
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Factory for creating FileBasedConfigStore instances.
+ */
+@Extension
+public class FileBasedConfigStoreFactory implements ConfigStoreFactory {
+    
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    
+    @Override
+    public String getName() {
+        return "file";
+    }
+
+    @Override
+    public ConfigStore buildExtension(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        try {
+            JsonNode params = OBJECT_MAPPER.readTree(config.json());
+            
+            String filePath = params.has("path") ? params.get("path").asText() 
: "config-store.json";
+            Path path = Paths.get(filePath);
+            
+            FileBasedConfigStore store = new FileBasedConfigStore(path);
+            store.setExtensionConfig(config);
+            store.init();
+            
+            return store;
+        } catch (Exception e) {
+            throw new TikaConfigException("Failed to create 
FileBasedConfigStore", e);
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 6715db1b6..58c73c4cf 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -71,6 +71,8 @@ import org.apache.tika.pipes.core.EmitStrategy;
 import org.apache.tika.pipes.core.EmitStrategyConfig;
 import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.config.ConfigStore;
+import org.apache.tika.pipes.core.config.ConfigStoreFactory;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
 import org.apache.tika.plugins.ExtensionConfig;
@@ -472,10 +474,12 @@ public class PipesServer implements AutoCloseable {
         TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
         TikaPluginManager tikaPluginManager = 
TikaPluginManager.load(tikaJsonConfig);
 
-        //TODO allowed named configurations in tika config
-        this.fetcherManager = FetcherManager.load(tikaPluginManager, 
tikaJsonConfig);
-        // Always initialize emitters to support runtime overrides via 
ParseContext
-        this.emitterManager = EmitterManager.load(tikaPluginManager, 
tikaJsonConfig);
+        // Create ConfigStore if specified in pipesConfig
+        ConfigStore configStore = createConfigStore(pipesConfig, 
tikaPluginManager);
+
+        // Load managers with ConfigStore to enable runtime modifications
+        this.fetcherManager = FetcherManager.load(tikaPluginManager, 
tikaJsonConfig, true, configStore);
+        this.emitterManager = EmitterManager.load(tikaPluginManager, 
tikaJsonConfig, true, configStore);
         this.autoDetectParser = (AutoDetectParser) 
tikaLoader.loadAutoDetectParser();
         // Get the digester for pre-parse digesting of container documents.
         // If user configured skipContainerDocumentDigest=false (the default), 
PipesServer
@@ -502,6 +506,24 @@ public class PipesServer implements AutoCloseable {
         this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
+    private ConfigStore createConfigStore(PipesConfig pipesConfig, 
TikaPluginManager tikaPluginManager) throws TikaException {
+        String configStoreType = pipesConfig.getConfigStoreType();
+        String configStoreParams = pipesConfig.getConfigStoreParams();
+        
+        if (configStoreType == null || "memory".equals(configStoreType)) {
+            // Use default in-memory store (no persistence)
+            return null;
+        }
+        
+        ExtensionConfig storeConfig = new ExtensionConfig(
+            configStoreType, configStoreType, configStoreParams);
+        
+        return ConfigStoreFactory.createConfigStore(
+                tikaPluginManager,
+                configStoreType,
+                storeConfig);
+    }
+
 
     private void write(PROCESSING_STATUS processingStatus, PipesResult 
pipesResult) {
         try {

Reply via email to