This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4595-dynamic-fetcher-management
in repository https://gitbox.apache.org/repos/asf/tika.git

commit c48d6c1f9209b929ec66f916606cef2079e99bf2
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Sun Dec 28 09:17:47 2025 -0600

    TIKA-4595: Add dynamic fetcher/emitter management API to PipesClient
    
    - Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands
    - Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands
    - Implemented PipesClient public API methods for runtime configuration
    - Implemented PipesServer command handlers
    - Added deleteComponent() and getComponentConfig() to 
AbstractComponentManager
    - Added wrapper methods to FetcherManager and EmitterManager
    - Added remove() method to ConfigStore interface and implementations
    - All tests passing
---
 .../tika/pipes/core/AbstractComponentManager.java  |  41 +++
 .../org/apache/tika/pipes/core/PipesClient.java    | 281 +++++++++++++++++++-
 .../apache/tika/pipes/core/config/ConfigStore.java |   9 +
 .../pipes/core/config/InMemoryConfigStore.java     |   5 +
 .../tika/pipes/core/emitter/EmitterManager.java    |  20 ++
 .../tika/pipes/core/fetcher/FetcherManager.java    |  20 ++
 .../apache/tika/pipes/core/server/PipesServer.java | 286 +++++++++++++++++++++
 .../tika/pipes/core/config/LoggingConfigStore.java |   8 +
 8 files changed, 669 insertions(+), 1 deletion(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
index 859d8dbb2..70de8b78b 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -309,6 +309,47 @@ public abstract class AbstractComponentManager<T extends 
TikaExtension,
         configStore.put(componentId, config);
     }
 
+    /**
+     * Deletes a component configuration by ID.
+     * Clears the cached instance and removes the configuration.
+     *
+     * @param componentId the component ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
component not found
+     */
+    public synchronized void deleteComponent(String componentId) throws 
TikaConfigException {
+        if (!allowRuntimeModifications) {
+            throw new TikaConfigException(
+                    "Runtime modifications are not allowed. " + 
getClass().getSimpleName() +
+                    " must be loaded with allowRuntimeModifications=true to 
use delete" +
+                    getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
+        }
+
+        if (componentId == null) {
+            throw new IllegalArgumentException("Component ID cannot be null");
+        }
+
+        if (!configStore.containsKey(componentId)) {
+            throw new TikaConfigException(
+                    getComponentName().substring(0, 
1).toUpperCase(Locale.ROOT) + 
+                    getComponentName().substring(1) + " with ID '" + 
componentId + "' not found");
+        }
+
+        // Clear cache and remove config
+        componentCache.remove(componentId);
+        configStore.remove(componentId);
+        LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
+    }
+
+    /**
+     * Gets the configuration for a specific component by ID.
+     *
+     * @param componentId the component ID
+     * @return the component configuration, or null if not found
+     */
+    public ExtensionConfig getComponentConfig(String componentId) {
+        return configStore.get(componentId);
+    }
+
     /**
      * Returns the set of supported component IDs.
      */
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 151931bdb..5f386c628 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -40,10 +40,12 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.TikaTaskTimeout;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -64,6 +67,7 @@ import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.server.IntermediateResult;
 import org.apache.tika.pipes.core.server.PipesServer;
+import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
@@ -77,7 +81,9 @@ import org.apache.tika.utils.StringUtils;
 public class PipesClient implements Closeable {
 
     public enum COMMANDS {
-        PING, ACK, NEW_REQUEST, SHUT_DOWN;
+        PING, ACK, NEW_REQUEST, SHUT_DOWN, 
+        SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER,
+        SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER;
 
         public byte getByte() {
             return (byte) (ordinal() + 1);
@@ -602,4 +608,277 @@ public class PipesClient implements Closeable {
         return tikaTaskTimeout.getTimeoutMillis();
     }
 
+    // ========== Fetcher Management API ==========
+
+    /**
+     * Save (create or update) a fetcher configuration.
+     * The fetcher will be available immediately for use in subsequent fetch 
operations.
+     * 
+     * @param config the fetcher configuration containing name, plugin ID, and 
parameters
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void saveFetcher(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            // Serialize the ExtensionConfig
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                oos.writeObject(config);
+            }
+            byte[] bytes = bos.toByteArray();
+            serverTuple.output.writeInt(bytes.length);
+            serverTuple.output.write(bytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) { // 0 = success, 1 = error
+                throw new TikaException("Failed to save fetcher: " + message);
+            }
+            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
+        }
+    }
+
+    /**
+     * Delete a fetcher by its name/ID.
+     * 
+     * @param fetcherId the fetcher name/ID to delete
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., fetcher not 
found)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void deleteFetcher(String fetcherId) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to delete fetcher: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
+        }
+    }
+
+    /**
+     * List all available fetcher IDs (both static from config and dynamically 
added).
+     * 
+     * @return set of fetcher IDs
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public Set<String> listFetchers() throws IOException, InterruptedException 
{
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte());
+            serverTuple.output.flush();
+            
+            // Read response
+            int count = serverTuple.input.readInt();
+            Set<String> fetcherIds = new HashSet<>(count);
+            for (int i = 0; i < count; i++) {
+                int len = serverTuple.input.readInt();
+                byte[] bytes = new byte[len];
+                serverTuple.input.readFully(bytes);
+                fetcherIds.add(new String(bytes, StandardCharsets.UTF_8));
+            }
+            LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, 
count);
+            return fetcherIds;
+        }
+    }
+
+    /**
+     * Get the configuration for a specific fetcher.
+     * 
+     * @param fetcherId the fetcher ID
+     * @return the fetcher configuration, or null if not found
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public ExtensionConfig getFetcherConfig(String fetcherId) throws 
IOException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.GET_FETCHER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            // Read response
+            byte found = serverTuple.input.readByte();
+            if (found == 0) {
+                return null;
+            }
+            
+            int len = serverTuple.input.readInt();
+            byte[] bytes = new byte[len];
+            serverTuple.input.readFully(bytes);
+            
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                return (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+        }
+    }
+
+    // ========== Emitter Management API ==========
+
+    /**
+     * Save (create or update) an emitter configuration.
+     * The emitter will be available immediately for use in subsequent emit 
operations.
+     * 
+     * @param config the emitter configuration containing name, plugin ID, and 
parameters
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void saveEmitter(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                oos.writeObject(config);
+            }
+            byte[] bytes = bos.toByteArray();
+            serverTuple.output.writeInt(bytes.length);
+            serverTuple.output.write(bytes);
+            serverTuple.output.flush();
+            
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to save emitter: " + message);
+            }
+            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
+        }
+    }
+
+    /**
+     * Delete an emitter by its name/ID.
+     * 
+     * @param emitterId the emitter name/ID to delete
+     * @throws IOException if communication with the server fails
+     * @throws TikaException if the server returns an error (e.g., emitter not 
found)
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public void deleteEmitter(String emitterId) throws IOException, 
TikaException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            byte status = serverTuple.input.readByte();
+            int msgLen = serverTuple.input.readInt();
+            byte[] msgBytes = new byte[msgLen];
+            serverTuple.input.readFully(msgBytes);
+            String message = new String(msgBytes, StandardCharsets.UTF_8);
+            
+            if (status != 0) {
+                throw new TikaException("Failed to delete emitter: " + 
message);
+            }
+            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
+        }
+    }
+
+    /**
+     * List all available emitter IDs (both static from config and dynamically 
added).
+     * 
+     * @return set of emitter IDs
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public Set<String> listEmitters() throws IOException, InterruptedException 
{
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte());
+            serverTuple.output.flush();
+            
+            int count = serverTuple.input.readInt();
+            Set<String> emitterIds = new HashSet<>(count);
+            for (int i = 0; i < count; i++) {
+                int len = serverTuple.input.readInt();
+                byte[] bytes = new byte[len];
+                serverTuple.input.readFully(bytes);
+                emitterIds.add(new String(bytes, StandardCharsets.UTF_8));
+            }
+            LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, 
count);
+            return emitterIds;
+        }
+    }
+
+    /**
+     * Get the configuration for a specific emitter.
+     * 
+     * @param emitterId the emitter ID
+     * @return the emitter configuration, or null if not found
+     * @throws IOException if communication with the server fails
+     * @throws InterruptedException if the operation is interrupted
+     */
+    public ExtensionConfig getEmitterConfig(String emitterId) throws 
IOException, InterruptedException {
+        maybeInit();
+        synchronized (lock) {
+            serverTuple.output.write(COMMANDS.GET_EMITTER.getByte());
+            serverTuple.output.flush();
+            
+            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
+            serverTuple.output.writeInt(idBytes.length);
+            serverTuple.output.write(idBytes);
+            serverTuple.output.flush();
+            
+            byte found = serverTuple.input.readByte();
+            if (found == 0) {
+                return null;
+            }
+            
+            int len = serverTuple.input.readInt();
+            byte[] bytes = new byte[len];
+            serverTuple.input.readFully(bytes);
+            
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                return (ExtensionConfig) ois.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
+            }
+        }
+    }
+
+    private final Object[] lock = new Object[0];
+
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
index 73d73ff7e..5c90fa0b7 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java
@@ -88,4 +88,13 @@ public interface ConfigStore extends TikaExtension {
      * @return the number of configurations
      */
     int size();
+
+    /**
+     * Removes a configuration by ID.
+     *
+     * @param id the configuration ID (must not be null)
+     * @return the removed configuration, or null if not found
+     * @throws NullPointerException if id is null
+     */
+    ExtensionConfig remove(String id);
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
index 7b6dab100..90a9c1ce6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java
@@ -63,4 +63,9 @@ public class InMemoryConfigStore implements ConfigStore {
     public int size() {
         return store.size();
     }
+
+    @Override
+    public ExtensionConfig remove(String id) {
+        return store.remove(id);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index e424fc35a..256e35b7c 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -174,4 +174,24 @@ public class EmitterManager extends 
AbstractComponentManager<Emitter, EmitterFac
     public void saveEmitter(ExtensionConfig config) throws 
TikaConfigException, IOException {
         saveComponent(config);
     }
+
+    /**
+     * Deletes an emitter configuration by ID.
+     *
+     * @param emitterId the emitter ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
emitter not found
+     */
+    public void deleteEmitter(String emitterId) throws TikaConfigException {
+        deleteComponent(emitterId);
+    }
+
+    /**
+     * Gets the configuration for a specific emitter by ID.
+     *
+     * @param emitterId the emitter ID
+     * @return the emitter configuration, or null if not found
+     */
+    public ExtensionConfig getConfig(String emitterId) {
+        return getComponentConfig(emitterId);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index e282b8495..4f4e9a199 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -173,4 +173,24 @@ public class FetcherManager extends 
AbstractComponentManager<Fetcher, FetcherFac
     public void saveFetcher(ExtensionConfig config) throws 
TikaConfigException, IOException {
         saveComponent(config);
     }
+
+    /**
+     * Deletes a fetcher configuration by ID.
+     *
+     * @param fetcherId the fetcher ID to delete
+     * @throws TikaConfigException if runtime modifications are not allowed or 
fetcher not found
+     */
+    public void deleteFetcher(String fetcherId) throws TikaConfigException {
+        deleteComponent(fetcherId);
+    }
+
+    /**
+     * Gets the configuration for a specific fetcher by ID.
+     *
+     * @param fetcherId the fetcher ID
+     * @return the fetcher configuration, or null if not found
+     */
+    public ExtensionConfig getConfig(String fetcherId) {
+        return getComponentConfig(fetcherId);
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index dd09db768..6715db1b6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -38,6 +38,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.HexFormat;
 import java.util.Locale;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -72,6 +73,7 @@ import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.serialization.ParseContextUtils;
 import org.apache.tika.utils.ExceptionUtils;
@@ -306,6 +308,22 @@ public class PipesServer implements AutoCloseable {
                         //swallow
                     }
                     System.exit(0);
+                } else if (request == 
PipesClient.COMMANDS.SAVE_FETCHER.getByte()) {
+                    handleSaveFetcher();
+                } else if (request == 
PipesClient.COMMANDS.DELETE_FETCHER.getByte()) {
+                    handleDeleteFetcher();
+                } else if (request == 
PipesClient.COMMANDS.LIST_FETCHERS.getByte()) {
+                    handleListFetchers();
+                } else if (request == 
PipesClient.COMMANDS.GET_FETCHER.getByte()) {
+                    handleGetFetcher();
+                } else if (request == 
PipesClient.COMMANDS.SAVE_EMITTER.getByte()) {
+                    handleSaveEmitter();
+                } else if (request == 
PipesClient.COMMANDS.DELETE_EMITTER.getByte()) {
+                    handleDeleteEmitter();
+                } else if (request == 
PipesClient.COMMANDS.LIST_EMITTERS.getByte()) {
+                    handleListEmitters();
+                } else if (request == 
PipesClient.COMMANDS.GET_EMITTER.getByte()) {
+                    handleGetEmitter();
                 } else {
                     String msg = String.format(Locale.ROOT,
                             "pipesClientId=%s: Unexpected byte 0x%02x in 
command position. " +
@@ -556,4 +574,272 @@ public class PipesServer implements AutoCloseable {
         }
     }
 
+    // ========== Fetcher Management Handlers ==========
+
+    private void handleSaveFetcher() {
+        try {
+            // Read ExtensionConfig
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            
+            ExtensionConfig config;
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                config = (ExtensionConfig) ois.readObject();
+            }
+            
+            // Save the fetcher
+            fetcherManager.saveFetcher(config);
+            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Fetcher saved successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error saving fetcher", pipesClientId, 
e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleDeleteFetcher() {
+        try {
+            // Read fetcher ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Delete the fetcher
+            fetcherManager.deleteFetcher(fetcherId);
+            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Fetcher deleted successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error deleting fetcher", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleListFetchers() {
+        try {
+            // Get list of fetcher IDs
+            Set<String> fetcherIds = fetcherManager.getSupported();
+            LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId, 
fetcherIds.size());
+            
+            // Send response
+            output.writeInt(fetcherIds.size());
+            for (String id : fetcherIds) {
+                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(idBytes.length);
+                output.write(idBytes);
+            }
+            output.flush();
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error listing fetchers", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    private void handleGetFetcher() {
+        try {
+            // Read fetcher ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Get fetcher config
+            ExtensionConfig config = fetcherManager.getConfig(fetcherId);
+            
+            if (config == null) {
+                output.writeByte(0); // not found
+                output.flush();
+            } else {
+                output.writeByte(1); // found
+                
+                // Serialize config
+                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                    oos.writeObject(config);
+                }
+                byte[] configBytes = bos.toByteArray();
+                output.writeInt(configBytes.length);
+                output.write(configBytes);
+                output.flush();
+            }
+            LOG.debug("pipesClientId={}: get fetcher '{}' = {}", 
pipesClientId, fetcherId, (config != null ? "found" : "not found"));
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error getting fetcher", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    // ========== Emitter Management Handlers ==========
+
+    private void handleSaveEmitter() {
+        try {
+            // Read ExtensionConfig
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            
+            ExtensionConfig config;
+            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
+                config = (ExtensionConfig) ois.readObject();
+            }
+            
+            // Save the emitter
+            emitterManager.saveEmitter(config);
+            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Emitter saved successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error saving emitter", pipesClientId, 
e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleDeleteEmitter() {
+        try {
+            // Read emitter ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String emitterId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Delete the emitter
+            emitterManager.deleteEmitter(emitterId);
+            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
+            
+            // Send success response
+            output.writeByte(0); // success
+            String msg = "Emitter deleted successfully";
+            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+            output.writeInt(msgBytes.length);
+            output.write(msgBytes);
+            output.flush();
+            
+        } catch (Exception e) {
+            LOG.error("pipesClientId={}: error deleting emitter", 
pipesClientId, e);
+            try {
+                output.writeByte(1); // error
+                String msg = ExceptionUtils.getStackTrace(e);
+                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(msgBytes.length);
+                output.write(msgBytes);
+                output.flush();
+            } catch (IOException ioe) {
+                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
+                exit(1);
+            }
+        }
+    }
+
+    private void handleListEmitters() {
+        try {
+            // Get list of emitter IDs
+            Set<String> emitterIds = emitterManager.getSupported();
+            LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId, 
emitterIds.size());
+            
+            // Send response
+            output.writeInt(emitterIds.size());
+            for (String id : emitterIds) {
+                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
+                output.writeInt(idBytes.length);
+                output.write(idBytes);
+            }
+            output.flush();
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error listing emitters", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
+    private void handleGetEmitter() {
+        try {
+            // Read emitter ID
+            int len = input.readInt();
+            byte[] bytes = new byte[len];
+            input.readFully(bytes);
+            String emitterId = new String(bytes, StandardCharsets.UTF_8);
+            
+            // Get emitter config
+            ExtensionConfig config = emitterManager.getConfig(emitterId);
+            
+            if (config == null) {
+                output.writeByte(0); // not found
+                output.flush();
+            } else {
+                output.writeByte(1); // found
+                
+                // Serialize config
+                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
+                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+                    oos.writeObject(config);
+                }
+                byte[] configBytes = bos.toByteArray();
+                output.writeInt(configBytes.length);
+                output.write(configBytes);
+                output.flush();
+            }
+            LOG.debug("pipesClientId={}: get emitter '{}' = {}", 
pipesClientId, emitterId, (config != null ? "found" : "not found"));
+            
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: error getting emitter", 
pipesClientId, e);
+            exit(1);
+        }
+    }
+
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
index 8ef299b35..07e901e2e 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java
@@ -83,4 +83,12 @@ public class LoggingConfigStore implements ConfigStore {
             return store.size();
         }
     }
+
+    @Override
+    public ExtensionConfig remove(String id) {
+        LOG.debug("ConfigStore: Removing config with id={}", id);
+        synchronized (store) {
+            return store.remove(id);
+        }
+    }
 }


Reply via email to