This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch file-based-config-store in repository https://gitbox.apache.org/repos/asf/tika.git
commit dab0ac754b6d07d361689d8082b0f0c484704f0d Author: Nicholas DiPiazza <[email protected]> AuthorDate: Sun Dec 28 10:27:06 2025 -0600 Add file-based ConfigStore for shared config between gRPC and PipesServer - Created FileBasedConfigStore that persists to JSON file - Created FileBasedConfigStoreFactory with @Extension annotation - Updated PipesServer.initializeResources() to create and use ConfigStore - Both gRPC server and forked PipesServer can now share fetcher configs via file This enables dynamic fetcher management across JVM processes: 1. gRPC saves fetcher → writes to config file 2. PipesServer starts → reads from same file 3. Both JVMs share the same fetcher configuration --- .../pipes/core/config/FileBasedConfigStore.java | 165 +++++++++++++++++++++ .../core/config/FileBasedConfigStoreFactory.java | 60 ++++++++ .../apache/tika/pipes/core/server/PipesServer.java | 30 +++- 3 files changed, 251 insertions(+), 4 deletions(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java new file mode 100644 index 000000000..ee0de07fd --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core.config; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.plugins.ExtensionConfig; + +/** + * File-based implementation of {@link ConfigStore} that persists configurations to a JSON file. + * This allows multiple JVM processes to share configuration through the filesystem. + * Thread-safe and suitable for multi-process deployments where PipesClient forks PipesServer. + */ +public class FileBasedConfigStore implements ConfigStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileBasedConfigStore.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final Path configFile; + private final ConcurrentHashMap<String, ExtensionConfig> cache = new ConcurrentHashMap<>(); + private ExtensionConfig extensionConfig; + + public FileBasedConfigStore(Path configFile) { + this.configFile = configFile; + } + + @Override + public ExtensionConfig getExtensionConfig() { + return extensionConfig; + } + + public void setExtensionConfig(ExtensionConfig extensionConfig) { + this.extensionConfig = extensionConfig; + } + + @Override + public void init() throws Exception { + // Create parent directories if they don't exist + if (configFile.getParent() != null) { + Files.createDirectories(configFile.getParent()); + } + + // Load existing configs if file exists + if (Files.exists(configFile)) { + loadFromFile(); + LOG.info("Loaded {} configurations from {}", cache.size(), configFile); + } else { + LOG.info("Config file does not exist yet, will be created on first save: {}", configFile); + } + } + + @Override + public synchronized void put(String id, ExtensionConfig config) { + cache.put(id, config); + saveToFile(); + } + + @Override + public ExtensionConfig get(String id) { + // Reload from file to get latest changes from other processes + try { + loadFromFile(); + } catch (IOException e) { + LOG.warn("Failed to reload config from file, using cache", e); + } + return cache.get(id); + } + + @Override + public boolean containsKey(String id) { + try { + loadFromFile(); + } catch (IOException e) { + LOG.warn("Failed to reload config from file, using cache", e); + } + return cache.containsKey(id); + } + + @Override + public Set<String> keySet() { + try { + loadFromFile(); + } catch (IOException e) { + LOG.warn("Failed to reload config from file, using cache", e); + } + return Set.copyOf(cache.keySet()); + } + + @Override + public int size() { + try { + loadFromFile(); + } catch (IOException e) { + LOG.warn("Failed to reload config from file, using cache", e); + } + return cache.size(); + } + + @Override + public synchronized ExtensionConfig remove(String id) { + ExtensionConfig removed = cache.remove(id); + if (removed != null) { + saveToFile(); + } + return removed; + } + + private synchronized void loadFromFile() throws IOException { + if (!Files.exists(configFile)) { + return; + } + + try { + Map<String, ExtensionConfig> loaded = OBJECT_MAPPER.readValue( + configFile.toFile(), + new TypeReference<Map<String, ExtensionConfig>>() {} + ); + cache.clear(); + cache.putAll(loaded); + } catch (IOException e) { + LOG.error("Failed to load configurations from {}", configFile, e); + throw e; + } + } + + private synchronized void saveToFile() { + try { + // Write to temp file first, then atomic rename + Path tempFile = Paths.get(configFile.toString() + ".tmp"); + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() + .writeValue(tempFile.toFile(), cache); + Files.move(tempFile, configFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + LOG.debug("Saved {} configurations to {}", cache.size(), configFile); + } catch (IOException e) { + LOG.error("Failed to save configurations to {}", configFile, e); + throw new RuntimeException("Failed to save config store", e); + } + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java new file mode 100644 index 000000000..59274b0c4 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core.config; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.pf4j.Extension; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Factory for creating FileBasedConfigStore instances. + */ +@Extension +public class FileBasedConfigStoreFactory implements ConfigStoreFactory { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public String getName() { + return "file"; + } + + @Override + public ConfigStore buildExtension(ExtensionConfig config) throws TikaConfigException, IOException { + try { + JsonNode params = OBJECT_MAPPER.readTree(config.json()); + + String filePath = params.has("path") ? params.get("path").asText() : "config-store.json"; + Path path = Paths.get(filePath); + + FileBasedConfigStore store = new FileBasedConfigStore(path); + store.setExtensionConfig(config); + store.init(); + + return store; + } catch (Exception e) { + throw new TikaConfigException("Failed to create FileBasedConfigStore", e); + } + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 6715db1b6..58c73c4cf 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -71,6 +71,8 @@ import org.apache.tika.pipes.core.EmitStrategy; import org.apache.tika.pipes.core.EmitStrategyConfig; import org.apache.tika.pipes.core.PipesClient; import org.apache.tika.pipes.core.PipesConfig; +import org.apache.tika.pipes.core.config.ConfigStore; +import org.apache.tika.pipes.core.config.ConfigStoreFactory; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; import org.apache.tika.plugins.ExtensionConfig; @@ -472,10 +474,12 @@ public class PipesServer implements AutoCloseable { TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig(); TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig); - //TODO allowed named configurations in tika config - this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig); - // Always initialize emitters to support runtime overrides via ParseContext - this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig); + // Create ConfigStore if specified in pipesConfig + ConfigStore configStore = createConfigStore(pipesConfig, tikaPluginManager); + + // Load managers with ConfigStore to enable runtime modifications + this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig, true, configStore); + this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig, true, configStore); this.autoDetectParser = (AutoDetectParser) tikaLoader.loadAutoDetectParser(); // Get the digester for pre-parse digesting of container documents. // If user configured skipContainerDocumentDigest=false (the default), PipesServer @@ -502,6 +506,24 @@ public class PipesServer implements AutoCloseable { this.rMetaParser = new RecursiveParserWrapper(autoDetectParser); } + private ConfigStore createConfigStore(PipesConfig pipesConfig, TikaPluginManager tikaPluginManager) throws TikaException { + String configStoreType = pipesConfig.getConfigStoreType(); + String configStoreParams = pipesConfig.getConfigStoreParams(); + + if (configStoreType == null || "memory".equals(configStoreType)) { + // Use default in-memory store (no persistence) + return null; + } + + ExtensionConfig storeConfig = new ExtensionConfig( + configStoreType, configStoreType, configStoreParams); + + return ConfigStoreFactory.createConfigStore( + tikaPluginManager, + configStoreType, + storeConfig); + } + private void write(PROCESSING_STATUS processingStatus, PipesResult pipesResult) { try {
