This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 34b60d6f7 TIKA-4595: Dynamic Fetcher/Emitter Management with 
ConfigStore Support (#2489)
34b60d6f7 is described below

commit 34b60d6f7f4deacbc765968bad606e7a5a89cceb
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon Dec 29 02:44:06 2025 -0600

    TIKA-4595: Dynamic Fetcher/Emitter Management with ConfigStore Support 
(#2489)
    
    * TIKA-4595: Add dynamic fetcher/emitter management API to PipesClient
    
    - Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands
    - Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands
    - Implemented PipesClient public API methods for runtime configuration
    - Implemented PipesServer command handlers
    - Added deleteComponent() and getComponentConfig() to 
AbstractComponentManager
    - Added wrapper methods to FetcherManager and EmitterManager
    - Added remove() method to ConfigStore interface and implementations
    - All tests passing
    
    * Fix gRPC saveFetcher to propagate to PipesClient's forked PipesServer
    
    - saveFetcher now calls both fetcherManager.saveFetcher() and 
pipesClient.saveFetcher()
    - This ensures fetchers are available in the forked PipesServer process
    - Implemented deleteFetcher to call both managers as well
    - Fixes FetcherNotFoundException when using dynamic fetchers via gRPC
    
    The issue was that fetchers saved via gRPC were only stored in the gRPC
    server's FetcherManager, but when pipesClient.process() forks a new
    PipesServer process, that process has its own FetcherManager and doesn't
    have access to the dynamically created fetchers. Now both are updated.
    
    * Add file-based ConfigStore for shared config between gRPC and PipesServer
    
    - Created FileBasedConfigStore that persists to JSON file
    - Created FileBasedConfigStoreFactory with @Extension annotation
    - Updated PipesServer.initializeResources() to create and use ConfigStore
    - Both gRPC server and forked PipesServer can now share fetcher configs via 
file
    
    This enables dynamic fetcher management across JVM processes:
    1. gRPC saves fetcher → writes to config file
    2. PipesServer starts → reads from same file
    3. Both JVMs share the same fetcher configuration
    
    * Handle 'file' ConfigStore type as built-in (not plugin)
    
    - Added direct handling for 'file' type in 
ConfigStoreFactory.createConfigStore()
    - File-based store is in core, not a plugin, so needs special handling
    - Avoids ClassNotFoundException when trying to load 'file' as a class name
    - Also added remove() method to IgniteConfigStore for interface compliance
    
    * Make ExtensionConfig Serializable for socket communication
    
    ExtensionConfig is sent over sockets between PipesClient and PipesServer,
    so it needs to implement Serializable. Records can implement Serializable
    and all fields (String, String, String) are already serializable.
    
    Fixes NotSerializableException when calling saveFetcher via gRPC.
    
    * Add embedded Ignite server architecture
    
    - Created IgniteStoreServer class that runs as embedded server
    - TikaGrpcServer starts Ignite server on startup (if ignite ConfigStore 
configured)
    - IgniteConfigStore now acts as client-only (clientMode=true)
    - No external Ignite dependency needed in Docker/Kubernetes
    - Server runs in background daemon thread within tika-grpc process
    - Clients (gRPC + forked PipesServer) connect to embedded server
    
    Architecture:
      ┌─────────────────────────────────┐
      │      tika-grpc Process         │
      │  ┌──────────────────────────┐  │
      │  │  IgniteStoreServer       │  │
      │  │  (server mode, daemon)   │  │
      │  └────────▲─────────────────┘  │
      │           │                     │
      │  ┌────────┴─────────────────┐  │
      │  │ IgniteConfigStore        │  │
      │  │ (client mode)            │  │
      │  └──────────────────────────┘  │
      └─────────────────────────────────┘
               ▲
               │ (client connection)
               │
      ┌────────┴─────────────────┐
      │  PipesServer (forked)    │
      │  IgniteConfigStore       │
      │  (client mode)           │
      └──────────────────────────┘
    
    * Configure Ignite work directory to /tmp
    
    - Set workDirectory to /tmp/ignite-work in IgniteStoreServer
    - Set workDirectory to /tmp/ignite-work in IgniteConfigStore
    - Avoids 'Work directory does not exist and cannot be created: /work' error
    - Uses system property ignite.work.dir if set, defaults to /tmp/ignite-work
    - Ensures Ignite can write to work directory in Docker containers
    
    * Use /var/cache/tika/ignite-work for Ignite work directory
    
    - Changed from /tmp/ignite-work to /var/cache/tika/ignite-work
    - Aligns with Tika's standard cache location
    - /var/cache/tika is already used for plugins and other Tika cache data
    
    * Use plugin classloader for Ignite server startup
    
    - Find Ignite plugin's classloader from plugin manager
    - Load IgniteStoreServer and CacheMode using plugin classloader
    - Fixes NoClassDefFoundError for H2 classes
    - Ensures all Ignite dependencies (including H2) are available
    - Plugin classloader has all dependencies from lib/ directory
    
    * Enable peer class loading in Ignite client
    
    - Set setPeerClassLoadingEnabled(true) in IgniteConfigStore
    - Must match server configuration
    - Fixes: Remote node has peer class loading enabled flag different from 
local
    - Both server and client now have peerClassLoading=true
    
    * Disable peer class loading to fix classloader conflicts
    
    - Set setPeerClassLoadingEnabled(false) on both server and client
    - Fixes ClassCastException due to class loaded by different classloaders
    - Server uses plugin classloader, client uses app classloader
    - Peer class loading causes the same class to be in both, creating conflicts
    - We don't need peer class loading for our use case
    
    * Move Ignite dependencies directly into tika-grpc
    
    - Made tika-pipes-ignite a required (non-optional) dependency of tika-grpc
    - Added ignite.version and h2.version properties
    - Removed reflection-based classloader lookup
    - Direct instantiation of IgniteStoreServer
    - Avoids all PF4J plugin classloader issues
    - Ignite classes now on main classpath
    
    * Disable Ignite Object Input Filter autoconfiguration
    
    - Set IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION=false
    - Fixes: Failed to autoconfigure Ignite Object Input Filter
    - Ignite was conflicting with existing serialization filter
    - Apply in both IgniteStoreServer and IgniteConfigStore
    
    * Add Ignite filter autoconfiguration disable to client too
    
    * Add Ignite as built-in ConfigStore type
    
    - Handle 'ignite' type directly in ConfigStoreFactory
    - Load IgniteConfigStoreFactory via reflection
    - Works in forked PipesServer without plugin system
    - Matches pattern used for 'file' type
    - Fixes: ClassNotFoundException: ignite in forked process
    
    * Add PipesIterator management API with save, get, and delete operations
    
    * Fix Ignite tests to use temp directory and server mode
    
    * Fix TikaGrpcServerTest - update assertion for delete operation now that 
it's implemented
---
 tika-grpc/pom.xml                                  |   3 +-
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 153 ++++++-
 tika-grpc/src/main/proto/tika.proto                |  53 +++
 .../apache/tika/pipes/grpc/TikaGrpcServerTest.java |   6 +-
 .../tika/pipes/core/AbstractComponentManager.java  |  41 ++
 .../org/apache/tika/pipes/core/PipesClient.java    | 393 +++++++++++++++++-
 .../apache/tika/pipes/core/config/ConfigStore.java |   9 +
 .../tika/pipes/core/config/ConfigStoreFactory.java |  28 ++
 .../pipes/core/config/FileBasedConfigStore.java    | 165 ++++++++
 .../core/config/FileBasedConfigStoreFactory.java   |  60 +++
 .../pipes/core/config/InMemoryConfigStore.java     |   5 +
 .../tika/pipes/core/emitter/EmitterManager.java    |  20 +
 .../tika/pipes/core/fetcher/FetcherManager.java    |  20 +
 .../apache/tika/pipes/core/server/PipesServer.java | 457 ++++++++++++++++++++-
 .../tika/pipes/core/config/LoggingConfigStore.java |   8 +
 .../tika/pipes/ignite/IgniteConfigStore.java       |  42 +-
 .../pipes/ignite/server/IgniteStoreServer.java     | 118 ++++++
 .../tika/pipes/ignite/IgniteConfigStoreTest.java   |  10 +
 .../org/apache/tika/plugins/ExtensionConfig.java   |   4 +-
 19 files changed, 1575 insertions(+), 20 deletions(-)

diff --git a/tika-grpc/pom.xml b/tika-grpc/pom.xml
index 88d919936..e0b72a877 100644
--- a/tika-grpc/pom.xml
+++ b/tika-grpc/pom.xml
@@ -38,6 +38,8 @@
     <asarkar-grpc-test.version>2.0.0</asarkar-grpc-test.version>
     <awaitility.version>4.3.0</awaitility.version>
     <j2objc-annotations.version>3.1</j2objc-annotations.version>
+    <ignite.version>2.17.0</ignite.version>
+    <h2.version>1.4.197</h2.version>
   </properties>
 
   <dependencyManagement>
@@ -232,7 +234,6 @@
       <groupId>org.apache.tika</groupId>
       <artifactId>tika-pipes-ignite</artifactId>
       <version>${project.version}</version>
-      <optional>true</optional>
     </dependency>
     <dependency>
       <groupId>org.apache.tika</groupId>
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index aa102a42a..0d3dc1b3c 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -39,16 +39,22 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.tika.DeleteFetcherReply;
 import org.apache.tika.DeleteFetcherRequest;
+import org.apache.tika.DeletePipesIteratorReply;
+import org.apache.tika.DeletePipesIteratorRequest;
 import org.apache.tika.FetchAndParseReply;
 import org.apache.tika.FetchAndParseRequest;
 import org.apache.tika.GetFetcherConfigJsonSchemaReply;
 import org.apache.tika.GetFetcherConfigJsonSchemaRequest;
 import org.apache.tika.GetFetcherReply;
 import org.apache.tika.GetFetcherRequest;
+import org.apache.tika.GetPipesIteratorReply;
+import org.apache.tika.GetPipesIteratorRequest;
 import org.apache.tika.ListFetchersReply;
 import org.apache.tika.ListFetchersRequest;
 import org.apache.tika.SaveFetcherReply;
 import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.SavePipesIteratorReply;
+import org.apache.tika.SavePipesIteratorRequest;
 import org.apache.tika.TikaGrpc;
 import org.apache.tika.config.ConfigContainer;
 import org.apache.tika.config.loader.TikaJsonConfig;
@@ -131,11 +137,43 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         ExtensionConfig storeConfig = new ExtensionConfig(
             configStoreType, configStoreType, configStoreParams);
 
+        // If using Ignite, start the embedded server first
+        if ("ignite".equalsIgnoreCase(configStoreType)) {
+            startIgniteServer(storeConfig);
+        }
+
         return ConfigStoreFactory.createConfigStore(
                 pluginManager,
                 configStoreType,
                 storeConfig);
     }
+    
+    private void startIgniteServer(ExtensionConfig config) {
+        try {
+            LOG.info("Starting embedded Ignite server for ConfigStore");
+            
+            // Parse config to get Ignite settings
+            com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
+            com.fasterxml.jackson.databind.JsonNode params = 
mapper.readTree(config.json());
+            
+            String cacheName = params.has("cacheName") ? 
params.get("cacheName").asText() : "tika-config-store";
+            String cacheMode = params.has("cacheMode") ? 
params.get("cacheMode").asText() : "REPLICATED";
+            String instanceName = params.has("igniteInstanceName") ? 
params.get("igniteInstanceName").asText() : "TikaIgniteServer";
+            
+            // Direct instantiation - no reflection needed
+            org.apache.ignite.cache.CacheMode mode = 
org.apache.ignite.cache.CacheMode.valueOf(cacheMode);
+            org.apache.tika.pipes.ignite.server.IgniteStoreServer server = 
+                new 
org.apache.tika.pipes.ignite.server.IgniteStoreServer(cacheName, mode, 
instanceName);
+            
+            server.startAsync();
+            
+            LOG.info("Embedded Ignite server started successfully");
+            
+        } catch (Exception e) {
+            LOG.error("Failed to start embedded Ignite server", e);
+            throw new RuntimeException("Failed to start Ignite server", e);
+        }
+    }
 
     @Override
     public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
@@ -225,7 +263,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         try {
             String factoryName = 
findFactoryNameForClass(request.getFetcherClass());
             ExtensionConfig config = new 
ExtensionConfig(request.getFetcherId(), factoryName, 
request.getFetcherConfigJson());
+            
+            // Save to gRPC server's fetcher manager (for schema queries, etc.)
             fetcherManager.saveFetcher(config);
+            
+            // Also save to PipesClient so it propagates to the forked 
PipesServer
+            pipesClient.saveFetcher(config);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -331,7 +374,113 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     }
 
     private boolean deleteFetcher(String id) {
-        LOG.warn("Deleting fetchers is not supported in the current 
implementation");
-        return false;
+        try {
+            // Delete from gRPC server's fetcher manager
+            fetcherManager.deleteFetcher(id);
+            
+            // Also delete from PipesClient so it propagates to the forked 
PipesServer
+            pipesClient.deleteFetcher(id);
+            
+            LOG.info("Successfully deleted fetcher: {}", id);
+            return true;
+        } catch (Exception e) {
+            LOG.error("Failed to delete fetcher: {}", id, e);
+            return false;
+        }
+    }
+    
+    // ========== PipesIterator RPC Methods ==========
+    
+    @Override
+    public void savePipesIterator(SavePipesIteratorRequest request,
+                                  StreamObserver<SavePipesIteratorReply> 
responseObserver) {
+        try {
+            String iteratorId = request.getIteratorId();
+            String iteratorClass = request.getIteratorClass();
+            String iteratorConfigJson = request.getIteratorConfigJson();
+            
+            LOG.info("Saving pipes iterator: id={}, class={}", iteratorId, 
iteratorClass);
+            
+            ExtensionConfig config = new ExtensionConfig(iteratorId, 
iteratorClass, iteratorConfigJson);
+            
+            // Save via PipesClient so it propagates to the forked PipesServer
+            pipesClient.savePipesIterator(config);
+            
+            SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder()
+                    .setMessage("Pipes iterator saved successfully")
+                    .build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+            
+            LOG.info("Successfully saved pipes iterator: {}", iteratorId);
+            
+        } catch (Exception e) {
+            LOG.error("Failed to save pipes iterator", e);
+            responseObserver.onError(io.grpc.Status.INTERNAL
+                    .withDescription("Failed to save pipes iterator: " + 
e.getMessage())
+                    .withCause(e)
+                    .asRuntimeException());
+        }
+    }
+    
+    @Override
+    public void getPipesIterator(GetPipesIteratorRequest request,
+                                 StreamObserver<GetPipesIteratorReply> 
responseObserver) {
+        try {
+            String iteratorId = request.getIteratorId();
+            LOG.info("Getting pipes iterator: {}", iteratorId);
+            
+            ExtensionConfig config = 
pipesClient.getPipesIteratorConfig(iteratorId);
+            
+            if (config == null) {
+                responseObserver.onError(io.grpc.Status.NOT_FOUND
+                        .withDescription("Pipes iterator not found: " + 
iteratorId)
+                        .asRuntimeException());
+                return;
+            }
+            
+            GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder()
+                    .setIteratorId(config.id())
+                    .setIteratorClass(config.name())
+                    .setIteratorConfigJson(config.json())
+                    .build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+            
+            LOG.info("Successfully retrieved pipes iterator: {}", iteratorId);
+            
+        } catch (Exception e) {
+            LOG.error("Failed to get pipes iterator", e);
+            responseObserver.onError(io.grpc.Status.INTERNAL
+                    .withDescription("Failed to get pipes iterator: " + 
e.getMessage())
+                    .withCause(e)
+                    .asRuntimeException());
+        }
+    }
+    
+    @Override
+    public void deletePipesIterator(DeletePipesIteratorRequest request,
+                                    StreamObserver<DeletePipesIteratorReply> 
responseObserver) {
+        try {
+            String iteratorId = request.getIteratorId();
+            LOG.info("Deleting pipes iterator: {}", iteratorId);
+            
+            pipesClient.deletePipesIterator(iteratorId);
+            
+            DeletePipesIteratorReply reply = 
DeletePipesIteratorReply.newBuilder()
+                    .setMessage("Pipes iterator deleted successfully")
+                    .build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+            
+            LOG.info("Successfully deleted pipes iterator: {}", iteratorId);
+            
+        } catch (Exception e) {
+            LOG.error("Failed to delete pipes iterator", e);
+            responseObserver.onError(io.grpc.Status.INTERNAL
+                    .withDescription("Failed to delete pipes iterator: " + 
e.getMessage())
+                    .withCause(e)
+                    .asRuntimeException());
+        }
     }
 }
diff --git a/tika-grpc/src/main/proto/tika.proto 
b/tika-grpc/src/main/proto/tika.proto
index 572ded7ab..aeb614dec 100644
--- a/tika-grpc/src/main/proto/tika.proto
+++ b/tika-grpc/src/main/proto/tika.proto
@@ -59,6 +59,19 @@ service Tika   {
     Get the Fetcher Config schema for a given fetcher class.
   */
   rpc GetFetcherConfigJsonSchema(GetFetcherConfigJsonSchemaRequest) returns 
(GetFetcherConfigJsonSchemaReply) {}
+  
+  /* 
+    Save a pipes iterator to the iterator store.   
+  */
+  rpc SavePipesIterator(SavePipesIteratorRequest) returns 
(SavePipesIteratorReply) {}
+  /* 
+    Get a pipes iterator's data from the iterator store.   
+  */
+  rpc GetPipesIterator(GetPipesIteratorRequest) returns 
(GetPipesIteratorReply) {}
+  /* 
+    Delete a pipes iterator from the iterator store.   
+  */
+  rpc DeletePipesIterator(DeletePipesIteratorRequest) returns 
(DeletePipesIteratorReply) {}
 }
 
 message SaveFetcherRequest {
@@ -143,3 +156,43 @@ message GetFetcherConfigJsonSchemaReply {
   // The json schema that describes the fetcher config in string format.
   string fetcher_config_json_schema = 1;
 }
+
+// ========== PipesIterator Messages ==========
+
+message SavePipesIteratorRequest {
+  // A unique identifier for each pipes iterator. If this already exists, 
operation will overwrite existing.
+  string iterator_id = 1;
+  // The full java class name of the pipes iterator class.
+  string iterator_class = 2;
+  // JSON string of the pipes iterator config object.
+  string iterator_config_json = 3;
+}
+
+message SavePipesIteratorReply {
+  // Status message
+  string message = 1;
+}
+
+message GetPipesIteratorRequest {
+  // The pipes iterator ID to retrieve
+  string iterator_id = 1;
+}
+
+message GetPipesIteratorReply {
+  // The pipes iterator ID
+  string iterator_id = 1;
+  // The full java class name of the pipes iterator
+  string iterator_class = 2;
+  // JSON string of the pipes iterator config object
+  string iterator_config_json = 3;
+}
+
+message DeletePipesIteratorRequest {
+  // The pipes iterator ID to delete
+  string iterator_id = 1;
+}
+
+message DeletePipesIteratorReply {
+  // Status message
+  string message = 1;
+}
diff --git 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java 
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index 15c3ec39b..0b89604bf 100644
--- a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -184,15 +184,15 @@ public class TikaGrpcServerTest {
             assertEquals(FileSystemFetcher.class.getName(), 
getFetcherReply.getFetcherClass());
         }
 
-        // delete fetchers - note: delete is not currently supported
+        // delete fetchers
         for (int i = 0; i < NUM_FETCHERS_TO_CREATE; ++i) {
             String fetcherId = createFetcherId(i);
             DeleteFetcherReply deleteFetcherReply = 
blockingStub.deleteFetcher(DeleteFetcherRequest
                     .newBuilder()
                     .setFetcherId(fetcherId)
                     .build());
-            // Delete is not supported, so this will return false
-            Assertions.assertFalse(deleteFetcherReply.getSuccess());
+            // Delete is now supported and should succeed
+            Assertions.assertTrue(deleteFetcherReply.getSuccess());
         }
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
index 859d8dbb2..70de8b78b 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -309,6 +309,47 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
         configStore.put(componentId, config);
     }
 
+    /**
+     * Deletes a component configuration by ID.
+     * Clears the cached instance and removes the configuration.
+     *
+     * @param componentId the component ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
component not found
+     */
+    public synchronized void deleteComponent(String componentId) throws 
TikaConfigException {
+        if (!allowRuntimeModifications) {
+            throw new TikaConfigException(
+                    "Runtime modifications are not allowed. " + 
getClass().getSimpleName() +
+                    " must be loaded with allowRuntimeModifications=true to 
use delete" +
+                    getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
+        }
+
+        if (componentId == null) {
+            throw new IllegalArgumentException("Component ID cannot be null");
+        }
+
+        if (!configStore.containsKey(componentId)) {
+            throw new TikaConfigException(
+                    getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) + 
+                    getComponentName().substring(1) + " with ID '" + 
componentId + "' not found");
+        }
+
+        // Clear cache and remove config
+        componentCache.remove(componentId);
+        configStore.remove(componentId);
+        LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
+    }
+
+    /**
+     * Gets the configuration for a specific component by ID.
+     *
+     * @param componentId the component ID
+     * @return the component configuration, or null if not found
+     */
+    public ExtensionConfig getComponentConfig(String componentId) {
+        return configStore.get(componentId);
+    }
+
     /**
      * Returns the set of supported component IDs.
      */
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 151931bdb..5d1ef5bdf 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -40,10 +40,12 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.TikaTaskTimeout;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -64,6 +67,7 @@ import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.server.IntermediateResult;
 import org.apache.tika.pipes.core.server.PipesServer;
+import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
@@ -77,7 +81,10 @@ import org.apache.tika.utils.StringUtils;
 public class PipesClient implements Closeable {
 
     public enum COMMANDS {
-        PING, ACK, NEW_REQUEST, SHUT_DOWN;
+        PING, ACK, NEW_REQUEST, SHUT_DOWN, 
+        SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER,
+        SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER,
+        SAVE_PIPES_ITERATOR, DELETE_PIPES_ITERATOR, LIST_PIPES_ITERATORS, 
GET_PIPES_ITERATOR;
 
         public byte getByte() {
             return (byte) (ordinal() + 1);
@@ -602,4 +609,388 @@ public class PipesClient implements Closeable {
         return tikaTaskTimeout.getTimeoutMillis();
     }
 
+    // ========== Fetcher Management API ==========
+
+    /**
+     * Save (create or update) a fetcher configuration.
+     * The fetcher will be available immediately for use in subsequent fetch 
operations.
+     * 
+     * @param config the fetcher configuration containing name, plugin ID, and 
parameters
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void saveFetcher(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            // Serialize the ExtensionConfig
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                oos.writeObject(config);
+            }
+            byte[] bytes = bos.toByteArray();
+            serverTuple.output.writeInt(bytes.length);
+            serverTuple.output.write(bytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) { // 0 = success, 1 = error
+                throw new TikaException("Failed to save fetcher: " + message);
+            }
+            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
+        }
+    }
+
+    /**
+     * Delete a fetcher by its name/ID.
+     * 
+     * @param fetcherId the fetcher name/ID to delete
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., fetcher not 
found)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void deleteFetcher(String fetcherId) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to delete fetcher: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
+        }
+    }
+
+    /**
+     * List all available fetcher IDs (both static from config and dynamically 
added).
+     * 
+     * @return set of fetcher IDs
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public Set<String> listFetchers() throws IOException, InterruptedException 
{
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte());
+            serverTuple.output.flush();
+            
+            // Read response
+            int count = serverTuple.input.readInt();
+            Set<String> fetcherIds = new HashSet<>(count);
+            for (int i = 0; i < count; i++) {
+                int len = serverTuple.input.readInt();
+                byte[] bytes = new byte[len];
+                serverTuple.input.readFully(bytes);
+                fetcherIds.add(new String(bytes, StandardCharsets.UTF_8));
+            }
+            LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, 
count);
+            return fetcherIds;
+        }
+    }
+
+    /**
+     * Get the configuration for a specific fetcher.
+     * 
+     * @param fetcherId the fetcher ID
+     * @return the fetcher configuration, or null if not found
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public ExtensionConfig getFetcherConfig(String fetcherId) throws 
IOException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.GET_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte found = serverTuple.input.readByte();
+            if (found == 0) {
+                return null;
+            }
+            
+            int len = serverTuple.input.readInt();
+            byte[] bytes = new byte[len];
+            serverTuple.input.readFully(bytes);
+            
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                return (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+        }
+    }
+
+    // ========== Emitter Management API ==========
+
+    /**
+     * Save (create or update) an emitter configuration.
+     * The emitter will be available immediately for use in subsequent emit 
operations.
+     * 
+     * @param config the emitter configuration containing name, plugin ID, and 
parameters
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void saveEmitter(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                oos.writeObject(config);
+            }
+            byte[] bytes = bos.toByteArray();
+            serverTuple.output.writeInt(bytes.length);
+            serverTuple.output.write(bytes);
+            serverTuple.output.flush();
+            
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to save emitter: " + message);
+            }
+            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
+        }
+    }
+
+    /**
+     * Delete an emitter by its name/ID.
+     * 
+     * @param emitterId the emitter name/ID to delete
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., emitter not 
found)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void deleteEmitter(String emitterId) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to delete emitter: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
+        }
+    }
+
+    /**
+     * List all available emitter IDs (both static from config and dynamically 
added).
+     * 
+     * @return set of emitter IDs
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public Set<String> listEmitters() throws IOException, InterruptedException 
{
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte());
+            serverTuple.output.flush();
+            
+            int count = serverTuple.input.readInt();
+            Set<String> emitterIds = new HashSet<>(count);
+            for (int i = 0; i < count; i++) {
+                int len = serverTuple.input.readInt();
+                byte[] bytes = new byte[len];
+                serverTuple.input.readFully(bytes);
+                emitterIds.add(new String(bytes, StandardCharsets.UTF_8));
+            }
+            LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, 
count);
+            return emitterIds;
+        }
+    }
+
+    /**
+     * Get the configuration for a specific emitter.
+     * 
+     * @param emitterId the emitter ID
+     * @return the emitter configuration, or null if not found
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public ExtensionConfig getEmitterConfig(String emitterId) throws 
IOException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.GET_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            byte found = serverTuple.input.readByte();
+            if (found == 0) {
+                return null;
+            }
+            
+            int len = serverTuple.input.readInt();
+            byte[] bytes = new byte[len];
+            serverTuple.input.readFully(bytes);
+            
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                return (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+        }
+    }
+
+    // ========== PipesIterator Management API ==========
+
+    /**
+     * Save (create or update) a PipesIterator configuration.
+     * The iterator will be available immediately for use in subsequent 
operations.
+     * 
+     * @param config the iterator configuration containing name, plugin ID, 
and parameters
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void savePipesIterator(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.SAVE_PIPES_ITERATOR.getByte());
+            serverTuple.output.flush();
+            
+            // Serialize the ExtensionConfig
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                oos.writeObject(config);
+            }
+            byte[] bytes = bos.toByteArray();
+            serverTuple.output.writeInt(bytes.length);
+            serverTuple.output.write(bytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) { // 0 = success, 1 = error
+                throw new TikaException("Failed to save pipes iterator: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: saved pipes iterator '{}'", 
pipesClientId, config.id());
+        }
+    }
+
+    /**
+     * Delete a PipesIterator configuration by its ID.
+     * 
+     * @param iteratorId the iterator ID to delete
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., iterator 
not found)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void deletePipesIterator(String iteratorId) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.DELETE_PIPES_ITERATOR.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to delete pipes iterator: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", 
pipesClientId, iteratorId);
+        }
+    }
+
+    /**
+     * Get the configuration for a specific PipesIterator.
+     * 
+     * @param iteratorId the iterator ID
+     * @return the iterator configuration, or null if not found
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public ExtensionConfig getPipesIteratorConfig(String iteratorId) throws 
IOException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.GET_PIPES_ITERATOR.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte found = serverTuple.input.readByte();
+            if (found == 0) {
+                return null;
+            }
+            
+            int len = serverTuple.input.readInt();
+            byte[] bytes = new byte[len];
+            serverTuple.input.readFully(bytes);
+            
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                return (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+        }
+    }
+
+    private final Object[] lock = new Object[0];
+
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
index 73d73ff7e..5c90fa0b7 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
@@ -88,4 +88,13 @@ public interface ConfigStore extends TikaExtension {
      * @return the number of configurations
      */
     int size();
+
+    /**
+     * Removes a configuration by ID.
+     *
+     * @param id the configuration ID (must not be null)
+     * @return the removed configuration, or null if not found
+     * @throws NullPointerException if id is null
+     */
+    ExtensionConfig remove(String id);
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStoreFactory.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStoreFactory.java
index 1393ace0b..ff60ff7c9 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStoreFactory.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStoreFactory.java
@@ -58,6 +58,34 @@ public interface ConfigStoreFactory extends 
TikaExtensionFactory<ConfigStore> {
             return store;
         }
         
+        // Handle built-in types directly (not plugins)
+        if ("file".equalsIgnoreCase(configStoreType)) {
+            LOG.info("Creating FileBasedConfigStore");
+            FileBasedConfigStoreFactory factory = new 
FileBasedConfigStoreFactory();
+            try {
+                ExtensionConfig config = extensionConfig != null ? 
extensionConfig :
+                    new ExtensionConfig(configStoreType, configStoreType, 
"{}");
+                return factory.buildExtension(config);
+            } catch (IOException e) {
+                throw new TikaConfigException("Failed to create 
FileBasedConfigStore", e);
+            }
+        }
+        
+        if ("ignite".equalsIgnoreCase(configStoreType)) {
+            LOG.info("Creating IgniteConfigStore");
+            try {
+                Class<?> factoryClass = 
Class.forName("org.apache.tika.pipes.ignite.IgniteConfigStoreFactory");
+                ConfigStoreFactory factory = (ConfigStoreFactory) 
factoryClass.getDeclaredConstructor().newInstance();
+                ExtensionConfig config = extensionConfig != null ? 
extensionConfig :
+                    new ExtensionConfig(configStoreType, configStoreType, 
"{}");
+                return factory.buildExtension(config);
+            } catch (ClassNotFoundException e) {
+                throw new TikaConfigException("Ignite ConfigStore requested 
but tika-pipes-ignite not on classpath", e);
+            } catch (Exception e) {
+                throw new TikaConfigException("Failed to create 
IgniteConfigStore", e);
+            }
+        }
+        
         Map<String, ConfigStoreFactory> factoryMap = 
loadAllConfigStoreFactoryExtensions(pluginManager);
 
         ConfigStoreFactory factory = factoryMap.get(configStoreType);
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
new file mode 100644
index 000000000..ee0de07fd
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStore.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * File-based implementation of {@link ConfigStore} that persists 
configurations to a JSON file.
+ * This allows multiple JVM processes to share configuration through the 
filesystem.
+ * Thread-safe and suitable for multi-process deployments where PipesClient 
forks PipesServer.
+ */
+public class FileBasedConfigStore implements ConfigStore {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedConfigStore.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    
+    private final Path configFile;
+    private final ConcurrentHashMap<String, ExtensionConfig> cache = new 
ConcurrentHashMap<>();
+    private ExtensionConfig extensionConfig;
+    
+    public FileBasedConfigStore(Path configFile) {
+        this.configFile = configFile;
+    }
+
+    @Override
+    public ExtensionConfig getExtensionConfig() {
+        return extensionConfig;
+    }
+
+    public void setExtensionConfig(ExtensionConfig extensionConfig) {
+        this.extensionConfig = extensionConfig;
+    }
+
+    @Override
+    public void init() throws Exception {
+        // Create parent directories if they don't exist
+        if (configFile.getParent() != null) {
+            Files.createDirectories(configFile.getParent());
+        }
+        
+        // Load existing configs if file exists
+        if (Files.exists(configFile)) {
+            loadFromFile();
+            LOG.info("Loaded {} configurations from {}", cache.size(), 
configFile);
+        } else {
+            LOG.info("Config file does not exist yet, will be created on first 
save: {}", configFile);
+        }
+    }
+
+    @Override
+    public synchronized void put(String id, ExtensionConfig config) {
+        cache.put(id, config);
+        saveToFile();
+    }
+
+    @Override
+    public ExtensionConfig get(String id) {
+        // Reload from file to get latest changes from other processes
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.get(id);
+    }
+
+    @Override
+    public boolean containsKey(String id) {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.containsKey(id);
+    }
+
+    @Override
+    public Set<String> keySet() {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return Set.copyOf(cache.keySet());
+    }
+
+    @Override
+    public int size() {
+        try {
+            loadFromFile();
+        } catch (IOException e) {
+            LOG.warn("Failed to reload config from file, using cache", e);
+        }
+        return cache.size();
+    }
+
+    @Override
+    public synchronized ExtensionConfig remove(String id) {
+        ExtensionConfig removed = cache.remove(id);
+        if (removed != null) {
+            saveToFile();
+        }
+        return removed;
+    }
+
+    private synchronized void loadFromFile() throws IOException {
+        if (!Files.exists(configFile)) {
+            return;
+        }
+        
+        try {
+            Map<String, ExtensionConfig> loaded = OBJECT_MAPPER.readValue(
+                configFile.toFile(),
+                new TypeReference<Map<String, ExtensionConfig>>() {}
+            );
+            cache.clear();
+            cache.putAll(loaded);
+        } catch (IOException e) {
+            LOG.error("Failed to load configurations from {}", configFile, e);
+            throw e;
+        }
+    }
+
+    private synchronized void saveToFile() {
+        try {
+            // Write to temp file first, then atomic rename
+            Path tempFile = Paths.get(configFile.toString() + ".tmp");
+            OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+                .writeValue(tempFile.toFile(), cache);
+            Files.move(tempFile, configFile, 
StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
+            LOG.debug("Saved {} configurations to {}", cache.size(), 
configFile);
+        } catch (IOException e) {
+            LOG.error("Failed to save configurations to {}", configFile, e);
+            throw new RuntimeException("Failed to save config store", e);
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
new file mode 100644
index 000000000..59274b0c4
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/FileBasedConfigStoreFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.config;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Factory for creating FileBasedConfigStore instances.
+ */
+@Extension
+public class FileBasedConfigStoreFactory implements ConfigStoreFactory {
+    
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    
+    @Override
+    public String getName() {
+        return "file";
+    }
+
+    @Override
+    public ConfigStore buildExtension(ExtensionConfig config) throws 
TikaConfigException, IOException {
+        try {
+            JsonNode params = OBJECT_MAPPER.readTree(config.json());
+            
+            String filePath = params.has("path") ? params.get("path").asText() 
: "config-store.json";
+            Path path = Paths.get(filePath);
+            
+            FileBasedConfigStore store = new FileBasedConfigStore(path);
+            store.setExtensionConfig(config);
+            store.init();
+            
+            return store;
+        } catch (Exception e) {
+            throw new TikaConfigException("Failed to create 
FileBasedConfigStore", e);
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
index 7b6dab100..90a9c1ce6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
@@ -63,4 +63,9 @@ public class InMemoryConfigStore implements ConfigStore {
     public int size() {
         return store.size();
     }
+
+    @Override
+    public ExtensionConfig remove(String id) {
+        return store.remove(id);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index e424fc35a..256e35b7c 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -174,4 +174,24 @@ public class EmitterManager extends 
AbstractComponentManager<Emitter, EmitterFac
     public void saveEmitter(ExtensionConfig config) throws 
TikaConfigException, IOException {
         saveComponent(config);
     }
+
+    /**
+     * Deletes an emitter configuration by ID.
+     *
+     * @param emitterId the emitter ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
emitter not found
+     */
+    public void deleteEmitter(String emitterId) throws TikaConfigException {
+        deleteComponent(emitterId);
+    }
+
+    /**
+     * Gets the configuration for a specific emitter by ID.
+     *
+     * @param emitterId the emitter ID
+     * @return the emitter configuration, or null if not found
+     */
+    public ExtensionConfig getConfig(String emitterId) {
+        return getComponentConfig(emitterId);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index e282b8495..4f4e9a199 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -173,4 +173,24 @@ public class FetcherManager extends 
AbstractComponentManager<Fetcher, FetcherFac
     public void saveFetcher(ExtensionConfig config) throws 
TikaConfigException, IOException {
         saveComponent(config);
     }
+
+    /**
+     * Deletes a fetcher configuration by ID.
+     *
+     * @param fetcherId the fetcher ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
fetcher not found
+     */
+    public void deleteFetcher(String fetcherId) throws TikaConfigException {
+        deleteComponent(fetcherId);
+    }
+
+    /**
+     * Gets the configuration for a specific fetcher by ID.
+     *
+     * @param fetcherId the fetcher ID
+     * @return the fetcher configuration, or null if not found
+     */
+    public ExtensionConfig getConfig(String fetcherId) {
+        return getComponentConfig(fetcherId);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index dd09db768..8a90b4d89 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -38,6 +38,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.HexFormat;
 import java.util.Locale;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -70,8 +71,11 @@ import org.apache.tika.pipes.core.EmitStrategy;
 import org.apache.tika.pipes.core.EmitStrategyConfig;
 import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.config.ConfigStore;
+import org.apache.tika.pipes.core.config.ConfigStoreFactory;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.serialization.ParseContextUtils;
 import org.apache.tika.utils.ExceptionUtils;
@@ -150,6 +154,7 @@ public class PipesServer implements AutoCloseable {
     private RecursiveParserWrapper rMetaParser;
     private FetcherManager fetcherManager;
     private EmitterManager emitterManager;
+    private ConfigStore configStore;
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
     private final ExecutorCompletionService<PipesResult> 
executorCompletionService = new ExecutorCompletionService<>(executorService);
     private final EmitStrategy emitStrategy;
@@ -306,6 +311,30 @@ public class PipesServer implements AutoCloseable {
                         //swallow
                     }
                     System.exit(0);
+                } else if (request == 
PipesClient.COMMANDS.SAVE_FETCHER.getByte()) {
+                    handleSaveFetcher();
+                } else if (request == 
PipesClient.COMMANDS.DELETE_FETCHER.getByte()) {
+                    handleDeleteFetcher();
+                } else if (request == 
PipesClient.COMMANDS.LIST_FETCHERS.getByte()) {
+                    handleListFetchers();
+                } else if (request == 
PipesClient.COMMANDS.GET_FETCHER.getByte()) {
+                    handleGetFetcher();
+                } else if (request == 
PipesClient.COMMANDS.SAVE_EMITTER.getByte()) {
+                    handleSaveEmitter();
+                } else if (request == 
PipesClient.COMMANDS.DELETE_EMITTER.getByte()) {
+                    handleDeleteEmitter();
+                } else if (request == 
PipesClient.COMMANDS.LIST_EMITTERS.getByte()) {
+                    handleListEmitters();
+                } else if (request == 
PipesClient.COMMANDS.GET_EMITTER.getByte()) {
+                    handleGetEmitter();
+                } else if (request == 
PipesClient.COMMANDS.SAVE_PIPES_ITERATOR.getByte()) {
+                    handleSavePipesIterator();
+                } else if (request == 
PipesClient.COMMANDS.DELETE_PIPES_ITERATOR.getByte()) {
+                    handleDeletePipesIterator();
+                } else if (request == 
PipesClient.COMMANDS.LIST_PIPES_ITERATORS.getByte()) {
+                    handleListPipesIterators();
+                } else if (request == 
PipesClient.COMMANDS.GET_PIPES_ITERATOR.getByte()) {
+                    handleGetPipesIterator();
                 } else {
                     String msg = String.format(Locale.ROOT,
                             "pipesClientId=%s: Unexpected byte 0x%02x in 
command position. " +
@@ -454,10 +483,12 @@ public class PipesServer implements AutoCloseable {
         TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
         TikaPluginManager tikaPluginManager = 
TikaPluginManager.load(tikaJsonConfig);
 
-        //TODO allowed named configurations in tika config
-        this.fetcherManager = FetcherManager.load(tikaPluginManager, 
tikaJsonConfig);
-        // Always initialize emitters to support runtime overrides via 
ParseContext
-        this.emitterManager = EmitterManager.load(tikaPluginManager, 
tikaJsonConfig);
+        // Create ConfigStore if specified in pipesConfig
+        this.configStore = createConfigStore(pipesConfig, tikaPluginManager);
+
+        // Load managers with ConfigStore to enable runtime modifications
+        this.fetcherManager = FetcherManager.load(tikaPluginManager, 
tikaJsonConfig, true, configStore);
+        this.emitterManager = EmitterManager.load(tikaPluginManager, 
tikaJsonConfig, true, configStore);
         this.autoDetectParser = (AutoDetectParser) 
tikaLoader.loadAutoDetectParser();
         // Get the digester for pre-parse digesting of container documents.
         // If user configured skipContainerDocumentDigest=false (the default), 
PipesServer
@@ -484,6 +515,24 @@ public class PipesServer implements AutoCloseable {
         this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
+    private ConfigStore createConfigStore(PipesConfig pipesConfig, 
TikaPluginManager tikaPluginManager) throws TikaException {
+        String configStoreType = pipesConfig.getConfigStoreType();
+        String configStoreParams = pipesConfig.getConfigStoreParams();
+        
+        if (configStoreType == null || "memory".equals(configStoreType)) {
+            // Use default in-memory store (no persistence)
+            return null;
+        }
+        
+        ExtensionConfig storeConfig = new ExtensionConfig(
+            configStoreType, configStoreType, configStoreParams);
+        
+        return ConfigStoreFactory.createConfigStore(
+                tikaPluginManager,
+                configStoreType,
+                storeConfig);
+    }
+
 
     private void write(PROCESSING_STATUS processingStatus, PipesResult 
pipesResult) {
         try {
@@ -556,4 +605,404 @@ public class PipesServer implements AutoCloseable {
         }
     }
 
+    // ========== Fetcher Management Handlers ==========
+
+    private void handleSaveFetcher() {
+        try {
+            // Read ExtensionConfig
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            
+            ExtensionConfig config;
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                config = (ExtensionConfig) ois.readObject();
+            }
+            
+            // Save the fetcher
+            fetcherManager.saveFetcher(config);
+            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Fetcher saved successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error saving fetcher", pipesClientId, 
e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleDeleteFetcher() {
+        try {
+            // Read fetcher ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Delete the fetcher
+            fetcherManager.deleteFetcher(fetcherId);
+            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Fetcher deleted successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error deleting fetcher", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleListFetchers() {
+        try {
+            // Get list of fetcher IDs
+            Set<String> fetcherIds = fetcherManager.getSupported();
+            LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId, 
fetcherIds.size());
+            
+            // Send response
+            output.writeInt(fetcherIds.size());
+            for (String id : fetcherIds) {
+                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(idBytes.length);
+                output.write(idBytes);
+            }
+            output.flush();
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error listing fetchers", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    private void handleGetFetcher() {
+        try {
+            // Read fetcher ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Get fetcher config
+            ExtensionConfig config = fetcherManager.getConfig(fetcherId);
+            
+            if (config == null) {
+                output.writeByte(0); // not found
+                output.flush();
+            } else {
+                output.writeByte(1); // found
+                
+                // Serialize config
+                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                    oos.writeObject(config);
+                }
+                byte[] configBytes = bos.toByteArray();
+                output.writeInt(configBytes.length);
+                output.write(configBytes);
+                output.flush();
+            }
+            LOG.debug("pipesClientId={}: get fetcher '{}' = {}", 
pipesClientId, fetcherId, (config != null ? "found" : "not found"));
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error getting fetcher", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    // ========== Emitter Management Handlers ==========
+
+    private void handleSaveEmitter() {
+        try {
+            // Read ExtensionConfig
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            
+            ExtensionConfig config;
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                config = (ExtensionConfig) ois.readObject();
+            }
+            
+            // Save the emitter
+            emitterManager.saveEmitter(config);
+            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Emitter saved successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error saving emitter", pipesClientId, 
e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleDeleteEmitter() {
+        try {
+            // Read emitter ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String emitterId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Delete the emitter
+            emitterManager.deleteEmitter(emitterId);
+            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Emitter deleted successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error deleting emitter", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleListEmitters() {
+        try {
+            // Get list of emitter IDs
+            Set<String> emitterIds = emitterManager.getSupported();
+            LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId, 
emitterIds.size());
+            
+            // Send response
+            output.writeInt(emitterIds.size());
+            for (String id : emitterIds) {
+                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(idBytes.length);
+                output.write(idBytes);
+            }
+            output.flush();
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error listing emitters", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    private void handleGetEmitter() {
+        try {
+            // Read emitter ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String emitterId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Get emitter config
+            ExtensionConfig config = emitterManager.getConfig(emitterId);
+            
+            if (config == null) {
+                output.writeByte(0); // not found
+                output.flush();
+            } else {
+                output.writeByte(1); // found
+                
+                // Serialize config
+                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                    oos.writeObject(config);
+                }
+                byte[] configBytes = bos.toByteArray();
+                output.writeInt(configBytes.length);
+                output.write(configBytes);
+                output.flush();
+            }
+            LOG.debug("pipesClientId={}: get emitter '{}' = {}", 
pipesClientId, emitterId, (config != null ? "found" : "not found"));
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error getting emitter", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    // ========== PipesIterator Command Handlers ==========
+    // Note: PipesIterators are primarily used on the client side to generate 
FetchEmitTuples.
+    // Unlike Fetchers and Emitters, they are not component managers in 
PipesServer.
+    // These handlers provide basic ConfigStore operations for consistency.
+    
+    private void handleSavePipesIterator() {
+        try {
+            // Read ExtensionConfig
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            
+            ExtensionConfig config;
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                config = (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+            
+            // Save to ConfigStore
+            configStore.put(PIPES_ITERATOR_PREFIX + config.id(), config);
+            
+            // Send success response
+            output.writeByte(0); // success
+            byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+            LOG.debug("pipesClientId={}: saved pipes iterator '{}'", 
pipesClientId, config.id());
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error saving pipes iterator", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                byte[] msgBytes = 
e.getMessage().getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioException) {
+                LOG.error("pipesClientId={}: error sending error response", 
pipesClientId, ioException);
+            }
+        }
+    }
+    
+    private void handleDeletePipesIterator() {
+        try {
+            // Read iterator ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String iteratorId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Delete from ConfigStore
+            configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId);
+            
+            // Send success response
+            output.writeByte(0); // success
+            byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+            LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", 
pipesClientId, iteratorId);
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error deleting pipes iterator", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                byte[] msgBytes = 
e.getMessage().getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioException) {
+                LOG.error("pipesClientId={}: error sending error response", 
pipesClientId, ioException);
+            }
+        }
+    }
+    
+    private void handleListPipesIterators() {
+        try {
+            // This is a placeholder - list operation not fully implemented
+            // Would need to iterate ConfigStore keys with 
PIPES_ITERATOR_PREFIX
+            output.writeByte(0); // success
+            byte[] msgBytes = "[]".getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            LOG.debug("pipesClientId={}: list pipes iterators (placeholder)", 
pipesClientId);
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error listing pipes iterators", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+    
+    private void handleGetPipesIterator() {
+        try {
+            // Read iterator ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String iteratorId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Get from ConfigStore
+            ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX + 
iteratorId);
+            
+            if (config == null) {
+                output.writeByte(0); // not found
+                output.flush();
+            } else {
+                output.writeByte(1); // found
+                
+                // Serialize config
+                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                    oos.writeObject(config);
+                }
+                byte[] configBytes = bos.toByteArray();
+                output.writeInt(configBytes.length);
+                output.write(configBytes);
+                output.flush();
+            }
+            LOG.debug("pipesClientId={}: get pipes iterator '{}' = {}", 
pipesClientId, iteratorId, (config != null ? "found" : "not found"));
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error getting pipes iterator", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+    
+    private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:";
+
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
index 8ef299b35..07e901e2e 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
@@ -83,4 +83,12 @@ public class LoggingConfigStore implements ConfigStore {
             return store.size();
         }
     }
+
+    @Override
+    public ExtensionConfig remove(String id) {
+        LOG.debug("ConfigStore: Removing config with id={}", id);
+        synchronized (store) {
+            return store.remove(id);
+        }
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
index e2f8a6c74..05becb62b 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
@@ -60,6 +60,7 @@ public class IgniteConfigStore implements ConfigStore {
     private boolean autoClose = true;
     private ExtensionConfig extensionConfig;
     private boolean closed = false;
+    private boolean clientMode = true;  // Default to client mode
 
     public IgniteConfigStore() {
     }
@@ -96,18 +97,30 @@ public class IgniteConfigStore implements ConfigStore {
         LOG.info("Initializing IgniteConfigStore with cache: {}, mode: {}, 
instance: {}",
                 cacheName, cacheMode, igniteInstanceName);
 
+        // Disable Ignite's Object Input Filter autoconfiguration to avoid 
conflicts
+        
System.setProperty("IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION", 
"false");
+
         IgniteConfiguration cfg = new IgniteConfiguration();
-        cfg.setIgniteInstanceName(igniteInstanceName);
-        cfg.setClientMode(false);
+        cfg.setIgniteInstanceName(igniteInstanceName + (clientMode ? "-Client" 
: ""));
+        cfg.setClientMode(clientMode);
+        cfg.setPeerClassLoadingEnabled(false);  // Disable to avoid 
classloader conflicts
+        
+        // Set work directory to /var/cache/tika to match Tika's cache location
+        cfg.setWorkDirectory(System.getProperty("ignite.work.dir", 
"/var/cache/tika/ignite-work"));
 
         ignite = Ignition.start(cfg);
 
-        CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new 
CacheConfiguration<>(cacheName);
-        cacheCfg.setCacheMode(cacheMode);
-        cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
-
-        cache = ignite.getOrCreateCache(cacheCfg);
-        LOG.info("IgniteConfigStore initialized successfully");
+        // Get cache (it should already exist on the server)
+        cache = ignite.cache(cacheName);
+        if (cache == null) {
+            // If not found, create it (shouldn't happen if server started 
first)
+            LOG.warn("Cache {} not found on server, creating it", cacheName);
+            CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new 
CacheConfiguration<>(cacheName);
+            cacheCfg.setCacheMode(cacheMode);
+            cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+            cache = ignite.getOrCreateCache(cacheCfg);
+        }
+        LOG.info("IgniteConfigStore initialized successfully as client");
     }
 
     @Override
@@ -155,6 +168,15 @@ public class IgniteConfigStore implements ConfigStore {
         return cache.size();
     }
 
+    @Override
+    public ExtensionConfig remove(String id) {
+        if (cache == null) {
+            throw new IllegalStateException("IgniteConfigStore not 
initialized. Call init() first.");
+        }
+        ExtensionConfigDTO removed = cache.getAndRemove(id);
+        return removed != null ? removed.toExtensionConfig() : null;
+    }
+
     public void close() {
         if (ignite != null && autoClose) {
             LOG.info("Closing IgniteConfigStore");
@@ -180,4 +202,8 @@ public class IgniteConfigStore implements ConfigStore {
     public void setAutoClose(boolean autoClose) {
         this.autoClose = autoClose;
     }
+
+    public void setClientMode(boolean clientMode) {
+        this.clientMode = clientMode;
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
new file mode 100644
index 000000000..8d8759ac8
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite.server;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.ignite.ExtensionConfigDTO;
+
+/**
+ * Embedded Ignite server that hosts the distributed cache.
+ * This runs as a background thread within the tika-grpc process.
+ * Tika gRPC and forked PipesServer instances connect as clients.
+ */
+public class IgniteStoreServer implements AutoCloseable {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(IgniteStoreServer.class);
+    private static final String DEFAULT_CACHE_NAME = "tika-config-store";
+    private static final String DEFAULT_INSTANCE_NAME = "TikaIgniteServer";
+    
+    private Ignite ignite;
+    private final String cacheName;
+    private final CacheMode cacheMode;
+    private final String instanceName;
+    
+    public IgniteStoreServer() {
+        this(DEFAULT_CACHE_NAME, CacheMode.REPLICATED, DEFAULT_INSTANCE_NAME);
+    }
+    
+    public IgniteStoreServer(String cacheName, CacheMode cacheMode, String 
instanceName) {
+        this.cacheName = cacheName;
+        this.cacheMode = cacheMode;
+        this.instanceName = instanceName;
+    }
+    
+    /**
+     * Start the Ignite server node in a background daemon thread.
+     */
+    public void startAsync() {
+        Thread serverThread = new Thread(() -> {
+            try {
+                start();
+            } catch (Exception e) {
+                LOG.error("Failed to start Ignite server", e);
+            }
+        }, "IgniteServerThread");
+        serverThread.setDaemon(true);
+        serverThread.start();
+        
+        // Wait for server to initialize
+        try {
+            Thread.sleep(3000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+    
+    private void start() throws Exception {
+        LOG.info("Starting Ignite server: instance={}, cache={}, mode={}", 
+            instanceName, cacheName, cacheMode);
+        
+        // Disable Ignite's Object Input Filter autoconfiguration to avoid 
conflicts
+        
System.setProperty("IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION", 
"false");
+        
+        IgniteConfiguration cfg = new IgniteConfiguration();
+        cfg.setIgniteInstanceName(instanceName);
+        cfg.setClientMode(false); // Server mode
+        cfg.setPeerClassLoadingEnabled(false); // Disable to avoid classloader 
conflicts
+        
+        // Set work directory to /var/cache/tika to match Tika's cache location
+        cfg.setWorkDirectory(System.getProperty("ignite.work.dir", 
"/var/cache/tika/ignite-work"));
+        
+        ignite = Ignition.start(cfg);
+        
+        CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = 
+            new CacheConfiguration<>(cacheName);
+        cacheCfg.setCacheMode(cacheMode);
+        cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+        
+        IgniteCache<String, ExtensionConfigDTO> cache = 
ignite.getOrCreateCache(cacheCfg);
+        
+        LOG.info("Ignite server started successfully with cache: {}", 
cache.getName());
+        LOG.info("Ignite topology: {} nodes", ignite.cluster().nodes().size());
+    }
+    
+    public boolean isRunning() {
+        return ignite != null;
+    }
+    
+    @Override
+    public void close() {
+        if (ignite != null) {
+            LOG.info("Stopping Ignite server: {}", instanceName);
+            ignite.close();
+            ignite = null;
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java
index bc72ad0b5..41a520e1f 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java
@@ -23,20 +23,30 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.file.Path;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.plugins.ExtensionConfig;
 
 public class IgniteConfigStoreTest {
 
+    @TempDir
+    private Path tempDir;
+    
     private IgniteConfigStore store;
 
     @BeforeEach
     public void setUp() throws Exception {
+        // Set the work directory for Ignite to use the temp directory
+        System.setProperty("ignite.work.dir", tempDir.toString());
+        
         store = new IgniteConfigStore();
         store.setIgniteInstanceName("TestIgniteInstance-" + 
System.currentTimeMillis());
+        store.setClientMode(false);  // Run as server for tests
         store.init();
     }
 
diff --git 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ExtensionConfig.java 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ExtensionConfig.java
index c393ab0fc..52bc5d3c0 100644
--- 
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/ExtensionConfig.java
+++ 
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/ExtensionConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.tika.plugins;
 
+import java.io.Serializable;
+
 import org.apache.tika.config.JsonConfig;
 
 /**
@@ -25,6 +27,6 @@ import org.apache.tika.config.JsonConfig;
  * @param name       the plugin type name
  * @param json       the raw JSON configuration string for the plugin to parse
  */
-public record ExtensionConfig(String id, String name, String json) implements 
JsonConfig {
+public record ExtensionConfig(String id, String name, String json) implements 
JsonConfig, Serializable {
 
 }


Reply via email to