This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new c43f000033 TIKA-4616 -- mv fetcher/emitter CRUD to tika-grpc (#2519)
c43f000033 is described below

commit c43f0000339cf326e9a3387d4bccc2a8dc406132
Author: Tim Allison <[email protected]>
AuthorDate: Wed Jan 14 08:11:28 2026 -0500

    TIKA-4616 -- mv fetcher/emitter CRUD to tika-grpc (#2519)
---
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java |  58 ++-
 .../org/apache/tika/pipes/core/PipesClient.java    | 394 +------------------
 .../apache/tika/pipes/core/server/PipesServer.java | 426 ---------------------
 3 files changed, 29 insertions(+), 849 deletions(-)

diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 8e5bfe2676..a21c32a77e 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -81,9 +81,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             .build();
     public static final JsonSchemaGenerator JSON_SCHEMA_GENERATOR = new 
JsonSchemaGenerator(OBJECT_MAPPER);
 
+    private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:";
+
     PipesConfig pipesConfig;
     PipesClient pipesClient;
     FetcherManager fetcherManager;
+    ConfigStore configStore;
     Path tikaConfigPath;
     PluginManager pluginManager;
 
@@ -125,9 +128,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             pluginManager = new org.pf4j.DefaultPluginManager();
         }
 
-        ConfigStore configStore = createConfigStore();
+        this.configStore = createConfigStore();
 
-        fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig, 
true, configStore);
+        fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig, 
true, this.configStore);
     }
 
     private ConfigStore createConfigStore() throws TikaConfigException {
@@ -261,11 +264,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             String factoryName = 
findFactoryNameForClass(request.getFetcherClass());
             ExtensionConfig config = new 
ExtensionConfig(request.getFetcherId(), factoryName, 
request.getFetcherConfigJson());
             
-            // Save to gRPC server's fetcher manager (for schema queries, etc.)
+            // Save to fetcher manager (updates ConfigStore which is shared 
with PipesServer)
             fetcherManager.saveFetcher(config);
-            
-            // Also save to PipesClient so it propagates to the forked 
PipesServer
-            pipesClient.saveFetcher(config);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -372,12 +372,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
 
     private boolean deleteFetcher(String id) {
         try {
-            // Delete from gRPC server's fetcher manager
+            // Delete from fetcher manager (updates ConfigStore which is 
shared with PipesServer)
             fetcherManager.deleteFetcher(id);
-            
-            // Also delete from PipesClient so it propagates to the forked 
PipesServer
-            pipesClient.deleteFetcher(id);
-            
             LOG.info("Successfully deleted fetcher: {}", id);
             return true;
         } catch (Exception e) {
@@ -395,22 +391,22 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             String iteratorId = request.getIteratorId();
             String iteratorClass = request.getIteratorClass();
             String iteratorConfigJson = request.getIteratorConfigJson();
-            
+
             LOG.info("Saving pipes iterator: id={}, class={}", iteratorId, 
iteratorClass);
-            
+
             ExtensionConfig config = new ExtensionConfig(iteratorId, 
iteratorClass, iteratorConfigJson);
-            
-            // Save via PipesClient so it propagates to the forked PipesServer
-            pipesClient.savePipesIterator(config);
-            
+
+            // Save directly to ConfigStore (shared with PipesServer)
+            configStore.put(PIPES_ITERATOR_PREFIX + iteratorId, config);
+
             SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder()
                     .setMessage("Pipes iterator saved successfully")
                     .build();
             responseObserver.onNext(reply);
             responseObserver.onCompleted();
-            
+
             LOG.info("Successfully saved pipes iterator: {}", iteratorId);
-            
+
         } catch (Exception e) {
             LOG.error("Failed to save pipes iterator", e);
             responseObserver.onError(io.grpc.Status.INTERNAL
@@ -426,16 +422,17 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         try {
             String iteratorId = request.getIteratorId();
             LOG.info("Getting pipes iterator: {}", iteratorId);
-            
-            ExtensionConfig config = 
pipesClient.getPipesIteratorConfig(iteratorId);
-            
+
+            // Get directly from ConfigStore (shared with PipesServer)
+            ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX + 
iteratorId);
+
             if (config == null) {
                 responseObserver.onError(io.grpc.Status.NOT_FOUND
                         .withDescription("Pipes iterator not found: " + 
iteratorId)
                         .asRuntimeException());
                 return;
             }
-            
+
             GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder()
                     .setIteratorId(config.id())
                     .setIteratorClass(config.name())
@@ -443,9 +440,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
                     .build();
             responseObserver.onNext(reply);
             responseObserver.onCompleted();
-            
+
             LOG.info("Successfully retrieved pipes iterator: {}", iteratorId);
-            
+
         } catch (Exception e) {
             LOG.error("Failed to get pipes iterator", e);
             responseObserver.onError(io.grpc.Status.INTERNAL
@@ -461,17 +458,18 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         try {
             String iteratorId = request.getIteratorId();
             LOG.info("Deleting pipes iterator: {}", iteratorId);
-            
-            pipesClient.deletePipesIterator(iteratorId);
-            
+
+            // Delete directly from ConfigStore (shared with PipesServer)
+            configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId);
+
             DeletePipesIteratorReply reply = 
DeletePipesIteratorReply.newBuilder()
                     .setMessage("Pipes iterator deleted successfully")
                     .build();
             responseObserver.onNext(reply);
             responseObserver.onCompleted();
-            
+
             LOG.info("Successfully deleted pipes iterator: {}", iteratorId);
-            
+
         } catch (Exception e) {
             LOG.error("Failed to delete pipes iterator", e);
             responseObserver.onError(io.grpc.Status.INTERNAL
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 5d1ef5bdf7..fda74a01c3 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -40,12 +40,10 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.TikaTaskTimeout;
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -67,7 +64,6 @@ import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.server.IntermediateResult;
 import org.apache.tika.pipes.core.server.PipesServer;
-import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
@@ -81,10 +77,7 @@ import org.apache.tika.utils.StringUtils;
 public class PipesClient implements Closeable {
 
     public enum COMMANDS {
-        PING, ACK, NEW_REQUEST, SHUT_DOWN, 
-        SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER,
-        SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER,
-        SAVE_PIPES_ITERATOR, DELETE_PIPES_ITERATOR, LIST_PIPES_ITERATORS, 
GET_PIPES_ITERATOR;
+        PING, ACK, NEW_REQUEST, SHUT_DOWN;
 
         public byte getByte() {
             return (byte) (ordinal() + 1);
@@ -608,389 +601,4 @@ public class PipesClient implements Closeable {
         LOG.debug("applying timeout from parseContext: {}ms", 
tikaTaskTimeout.getTimeoutMillis());
         return tikaTaskTimeout.getTimeoutMillis();
     }
-
-    // ========== Fetcher Management API ==========
-
-    /**
-     * Save (create or update) a fetcher configuration.
-     * The fetcher will be available immediately for use in subsequent fetch 
operations.
-     * 
-     * @param config the fetcher configuration containing name, plugin ID, and 
parameters
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void saveFetcher(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte());
-            serverTuple.output.flush();
-            
-            // Serialize the ExtensionConfig
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                oos.writeObject(config);
-            }
-            byte[] bytes = bos.toByteArray();
-            serverTuple.output.writeInt(bytes.length);
-            serverTuple.output.write(bytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) { // 0 = success, 1 = error
-                throw new TikaException("Failed to save fetcher: " + message);
-            }
-            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
-        }
-    }
-
-    /**
-     * Delete a fetcher by its name/ID.
-     * 
-     * @param fetcherId the fetcher name/ID to delete
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., fetcher not 
found)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void deleteFetcher(String fetcherId) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) {
-                throw new TikaException("Failed to delete fetcher: " + 
message);
-            }
-            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
-        }
-    }
-
-    /**
-     * List all available fetcher IDs (both static from config and dynamically 
added).
-     * 
-     * @return set of fetcher IDs
-     * @throws IOException if communication with the server fails
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public Set<String> listFetchers() throws IOException, InterruptedException 
{
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte());
-            serverTuple.output.flush();
-            
-            // Read response
-            int count = serverTuple.input.readInt();
-            Set<String> fetcherIds = new HashSet<>(count);
-            for (int i = 0; i < count; i++) {
-                int len = serverTuple.input.readInt();
-                byte[] bytes = new byte[len];
-                serverTuple.input.readFully(bytes);
-                fetcherIds.add(new String(bytes, StandardCharsets.UTF_8));
-            }
-            LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, 
count);
-            return fetcherIds;
-        }
-    }
-
-    /**
-     * Get the configuration for a specific fetcher.
-     * 
-     * @param fetcherId the fetcher ID
-     * @return the fetcher configuration, or null if not found
-     * @throws IOException if communication with the server fails
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public ExtensionConfig getFetcherConfig(String fetcherId) throws 
IOException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.GET_FETCHER.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte found = serverTuple.input.readByte();
-            if (found == 0) {
-                return null;
-            }
-            
-            int len = serverTuple.input.readInt();
-            byte[] bytes = new byte[len];
-            serverTuple.input.readFully(bytes);
-            
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                return (ExtensionConfig) ois.readObject();
-            } catch (ClassNotFoundException e) {
-                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
-            }
-        }
-    }
-
-    // ========== Emitter Management API ==========
-
-    /**
-     * Save (create or update) an emitter configuration.
-     * The emitter will be available immediately for use in subsequent emit 
operations.
-     * 
-     * @param config the emitter configuration containing name, plugin ID, and 
parameters
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void saveEmitter(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte());
-            serverTuple.output.flush();
-            
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                oos.writeObject(config);
-            }
-            byte[] bytes = bos.toByteArray();
-            serverTuple.output.writeInt(bytes.length);
-            serverTuple.output.write(bytes);
-            serverTuple.output.flush();
-            
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) {
-                throw new TikaException("Failed to save emitter: " + message);
-            }
-            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
-        }
-    }
-
-    /**
-     * Delete an emitter by its name/ID.
-     * 
-     * @param emitterId the emitter name/ID to delete
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., emitter not 
found)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void deleteEmitter(String emitterId) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) {
-                throw new TikaException("Failed to delete emitter: " + 
message);
-            }
-            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
-        }
-    }
-
-    /**
-     * List all available emitter IDs (both static from config and dynamically 
added).
-     * 
-     * @return set of emitter IDs
-     * @throws IOException if communication with the server fails
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public Set<String> listEmitters() throws IOException, InterruptedException 
{
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte());
-            serverTuple.output.flush();
-            
-            int count = serverTuple.input.readInt();
-            Set<String> emitterIds = new HashSet<>(count);
-            for (int i = 0; i < count; i++) {
-                int len = serverTuple.input.readInt();
-                byte[] bytes = new byte[len];
-                serverTuple.input.readFully(bytes);
-                emitterIds.add(new String(bytes, StandardCharsets.UTF_8));
-            }
-            LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, 
count);
-            return emitterIds;
-        }
-    }
-
-    /**
-     * Get the configuration for a specific emitter.
-     * 
-     * @param emitterId the emitter ID
-     * @return the emitter configuration, or null if not found
-     * @throws IOException if communication with the server fails
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public ExtensionConfig getEmitterConfig(String emitterId) throws 
IOException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.GET_EMITTER.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            byte found = serverTuple.input.readByte();
-            if (found == 0) {
-                return null;
-            }
-            
-            int len = serverTuple.input.readInt();
-            byte[] bytes = new byte[len];
-            serverTuple.input.readFully(bytes);
-            
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                return (ExtensionConfig) ois.readObject();
-            } catch (ClassNotFoundException e) {
-                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
-            }
-        }
-    }
-
-    // ========== PipesIterator Management API ==========
-
-    /**
-     * Save (create or update) a PipesIterator configuration.
-     * The iterator will be available immediately for use in subsequent 
operations.
-     * 
-     * @param config the iterator configuration containing name, plugin ID, 
and parameters
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., invalid 
configuration)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void savePipesIterator(ExtensionConfig config) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.SAVE_PIPES_ITERATOR.getByte());
-            serverTuple.output.flush();
-            
-            // Serialize the ExtensionConfig
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                oos.writeObject(config);
-            }
-            byte[] bytes = bos.toByteArray();
-            serverTuple.output.writeInt(bytes.length);
-            serverTuple.output.write(bytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) { // 0 = success, 1 = error
-                throw new TikaException("Failed to save pipes iterator: " + 
message);
-            }
-            LOG.debug("pipesClientId={}: saved pipes iterator '{}'", 
pipesClientId, config.id());
-        }
-    }
-
-    /**
-     * Delete a PipesIterator configuration by its ID.
-     * 
-     * @param iteratorId the iterator ID to delete
-     * @throws IOException if communication with the server fails
-     * @throws TikaException if the server returns an error (e.g., iterator 
not found)
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public void deletePipesIterator(String iteratorId) throws IOException, 
TikaException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.DELETE_PIPES_ITERATOR.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte status = serverTuple.input.readByte();
-            int msgLen = serverTuple.input.readInt();
-            byte[] msgBytes = new byte[msgLen];
-            serverTuple.input.readFully(msgBytes);
-            String message = new String(msgBytes, StandardCharsets.UTF_8);
-            
-            if (status != 0) {
-                throw new TikaException("Failed to delete pipes iterator: " + 
message);
-            }
-            LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", 
pipesClientId, iteratorId);
-        }
-    }
-
-    /**
-     * Get the configuration for a specific PipesIterator.
-     * 
-     * @param iteratorId the iterator ID
-     * @return the iterator configuration, or null if not found
-     * @throws IOException if communication with the server fails
-     * @throws InterruptedException if the operation is interrupted
-     */
-    public ExtensionConfig getPipesIteratorConfig(String iteratorId) throws 
IOException, InterruptedException {
-        maybeInit();
-        synchronized (lock) {
-            serverTuple.output.write(COMMANDS.GET_PIPES_ITERATOR.getByte());
-            serverTuple.output.flush();
-            
-            byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
-            serverTuple.output.writeInt(idBytes.length);
-            serverTuple.output.write(idBytes);
-            serverTuple.output.flush();
-            
-            // Read response
-            byte found = serverTuple.input.readByte();
-            if (found == 0) {
-                return null;
-            }
-            
-            int len = serverTuple.input.readInt();
-            byte[] bytes = new byte[len];
-            serverTuple.input.readFully(bytes);
-            
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                return (ExtensionConfig) ois.readObject();
-            } catch (ClassNotFoundException e) {
-                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
-            }
-        }
-    }
-
-    private final Object[] lock = new Object[0];
-
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 8c04a110c0..66d5d9ae50 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -38,7 +38,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.HexFormat;
 import java.util.Locale;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -315,30 +314,6 @@ public class PipesServer implements AutoCloseable {
                         //swallow
                     }
                     System.exit(0);
-                } else if (request == 
PipesClient.COMMANDS.SAVE_FETCHER.getByte()) {
-                    handleSaveFetcher();
-                } else if (request == 
PipesClient.COMMANDS.DELETE_FETCHER.getByte()) {
-                    handleDeleteFetcher();
-                } else if (request == 
PipesClient.COMMANDS.LIST_FETCHERS.getByte()) {
-                    handleListFetchers();
-                } else if (request == 
PipesClient.COMMANDS.GET_FETCHER.getByte()) {
-                    handleGetFetcher();
-                } else if (request == 
PipesClient.COMMANDS.SAVE_EMITTER.getByte()) {
-                    handleSaveEmitter();
-                } else if (request == 
PipesClient.COMMANDS.DELETE_EMITTER.getByte()) {
-                    handleDeleteEmitter();
-                } else if (request == 
PipesClient.COMMANDS.LIST_EMITTERS.getByte()) {
-                    handleListEmitters();
-                } else if (request == 
PipesClient.COMMANDS.GET_EMITTER.getByte()) {
-                    handleGetEmitter();
-                } else if (request == 
PipesClient.COMMANDS.SAVE_PIPES_ITERATOR.getByte()) {
-                    handleSavePipesIterator();
-                } else if (request == 
PipesClient.COMMANDS.DELETE_PIPES_ITERATOR.getByte()) {
-                    handleDeletePipesIterator();
-                } else if (request == 
PipesClient.COMMANDS.LIST_PIPES_ITERATORS.getByte()) {
-                    handleListPipesIterators();
-                } else if (request == 
PipesClient.COMMANDS.GET_PIPES_ITERATOR.getByte()) {
-                    handleGetPipesIterator();
                 } else {
                     String msg = String.format(Locale.ROOT,
                             "pipesClientId=%s: Unexpected byte 0x%02x in 
command position. " +
@@ -609,405 +584,4 @@ public class PipesServer implements AutoCloseable {
             exit(1);
         }
     }
-
-    // ========== Fetcher Management Handlers ==========
-
-    private void handleSaveFetcher() {
-        try {
-            // Read ExtensionConfig
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            
-            ExtensionConfig config;
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                config = (ExtensionConfig) ois.readObject();
-            }
-            
-            // Save the fetcher
-            fetcherManager.saveFetcher(config);
-            LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, 
config.id());
-            
-            // Send success response
-            output.writeByte(0); // success
-            String msg = "Fetcher saved successfully";
-            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error saving fetcher", pipesClientId, 
e);
-            try {
-                output.writeByte(1); // error
-                String msg = ExceptionUtils.getStackTrace(e);
-                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioe) {
-                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
-                exit(1);
-            }
-        }
-    }
-
-    private void handleDeleteFetcher() {
-        try {
-            // Read fetcher ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Delete the fetcher
-            fetcherManager.deleteFetcher(fetcherId);
-            LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, 
fetcherId);
-            
-            // Send success response
-            output.writeByte(0); // success
-            String msg = "Fetcher deleted successfully";
-            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error deleting fetcher", 
pipesClientId, e);
-            try {
-                output.writeByte(1); // error
-                String msg = ExceptionUtils.getStackTrace(e);
-                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioe) {
-                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
-                exit(1);
-            }
-        }
-    }
-
-    private void handleListFetchers() {
-        try {
-            // Get list of fetcher IDs
-            Set<String> fetcherIds = fetcherManager.getSupported();
-            LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId, 
fetcherIds.size());
-            
-            // Send response
-            output.writeInt(fetcherIds.size());
-            for (String id : fetcherIds) {
-                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(idBytes.length);
-                output.write(idBytes);
-            }
-            output.flush();
-            
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error listing fetchers", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-
-    private void handleGetFetcher() {
-        try {
-            // Read fetcher ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String fetcherId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Get fetcher config
-            ExtensionConfig config = fetcherManager.getConfig(fetcherId);
-            
-            if (config == null) {
-                output.writeByte(0); // not found
-                output.flush();
-            } else {
-                output.writeByte(1); // found
-                
-                // Serialize config
-                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                    oos.writeObject(config);
-                }
-                byte[] configBytes = bos.toByteArray();
-                output.writeInt(configBytes.length);
-                output.write(configBytes);
-                output.flush();
-            }
-            LOG.debug("pipesClientId={}: get fetcher '{}' = {}", 
pipesClientId, fetcherId, (config != null ? "found" : "not found"));
-            
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error getting fetcher", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-
-    // ========== Emitter Management Handlers ==========
-
-    private void handleSaveEmitter() {
-        try {
-            // Read ExtensionConfig
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            
-            ExtensionConfig config;
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                config = (ExtensionConfig) ois.readObject();
-            }
-            
-            // Save the emitter
-            emitterManager.saveEmitter(config);
-            LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, 
config.id());
-            
-            // Send success response
-            output.writeByte(0); // success
-            String msg = "Emitter saved successfully";
-            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error saving emitter", pipesClientId, 
e);
-            try {
-                output.writeByte(1); // error
-                String msg = ExceptionUtils.getStackTrace(e);
-                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioe) {
-                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
-                exit(1);
-            }
-        }
-    }
-
-    private void handleDeleteEmitter() {
-        try {
-            // Read emitter ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String emitterId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Delete the emitter
-            emitterManager.deleteEmitter(emitterId);
-            LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, 
emitterId);
-            
-            // Send success response
-            output.writeByte(0); // success
-            String msg = "Emitter deleted successfully";
-            byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error deleting emitter", 
pipesClientId, e);
-            try {
-                output.writeByte(1); // error
-                String msg = ExceptionUtils.getStackTrace(e);
-                byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioe) {
-                LOG.error("pipesClientId={}: failed to send error response", 
pipesClientId, ioe);
-                exit(1);
-            }
-        }
-    }
-
-    private void handleListEmitters() {
-        try {
-            // Get list of emitter IDs
-            Set<String> emitterIds = emitterManager.getSupported();
-            LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId, 
emitterIds.size());
-            
-            // Send response
-            output.writeInt(emitterIds.size());
-            for (String id : emitterIds) {
-                byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
-                output.writeInt(idBytes.length);
-                output.write(idBytes);
-            }
-            output.flush();
-            
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error listing emitters", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-
-    private void handleGetEmitter() {
-        try {
-            // Read emitter ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String emitterId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Get emitter config
-            ExtensionConfig config = emitterManager.getConfig(emitterId);
-            
-            if (config == null) {
-                output.writeByte(0); // not found
-                output.flush();
-            } else {
-                output.writeByte(1); // found
-                
-                // Serialize config
-                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                    oos.writeObject(config);
-                }
-                byte[] configBytes = bos.toByteArray();
-                output.writeInt(configBytes.length);
-                output.write(configBytes);
-                output.flush();
-            }
-            LOG.debug("pipesClientId={}: get emitter '{}' = {}", 
pipesClientId, emitterId, (config != null ? "found" : "not found"));
-            
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error getting emitter", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-
-    // ========== PipesIterator Command Handlers ==========
-    // Note: PipesIterators are primarily used on the client side to generate 
FetchEmitTuples.
-    // Unlike Fetchers and Emitters, they are not component managers in 
PipesServer.
-    // These handlers provide basic ConfigStore operations for consistency.
-    
-    private void handleSavePipesIterator() {
-        try {
-            // Read ExtensionConfig
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            
-            ExtensionConfig config;
-            try (ObjectInputStream ois = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))) {
-                config = (ExtensionConfig) ois.readObject();
-            } catch (ClassNotFoundException e) {
-                throw new IOException("Failed to deserialize ExtensionConfig", 
e);
-            }
-            
-            // Save to ConfigStore
-            configStore.put(PIPES_ITERATOR_PREFIX + config.id(), config);
-            
-            // Send success response
-            output.writeByte(0); // success
-            byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-            LOG.debug("pipesClientId={}: saved pipes iterator '{}'", 
pipesClientId, config.id());
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error saving pipes iterator", 
pipesClientId, e);
-            try {
-                output.writeByte(1); // error
-                byte[] msgBytes = 
e.getMessage().getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioException) {
-                LOG.error("pipesClientId={}: error sending error response", 
pipesClientId, ioException);
-            }
-        }
-    }
-    
-    private void handleDeletePipesIterator() {
-        try {
-            // Read iterator ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String iteratorId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Delete from ConfigStore
-            configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId);
-            
-            // Send success response
-            output.writeByte(0); // success
-            byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            
-            LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", 
pipesClientId, iteratorId);
-            
-        } catch (Exception e) {
-            LOG.error("pipesClientId={}: error deleting pipes iterator", 
pipesClientId, e);
-            try {
-                output.writeByte(1); // error
-                byte[] msgBytes = 
e.getMessage().getBytes(StandardCharsets.UTF_8);
-                output.writeInt(msgBytes.length);
-                output.write(msgBytes);
-                output.flush();
-            } catch (IOException ioException) {
-                LOG.error("pipesClientId={}: error sending error response", 
pipesClientId, ioException);
-            }
-        }
-    }
-    
-    private void handleListPipesIterators() {
-        try {
-            // This is a placeholder - list operation not fully implemented
-            // Would need to iterate ConfigStore keys with 
PIPES_ITERATOR_PREFIX
-            output.writeByte(0); // success
-            byte[] msgBytes = "[]".getBytes(StandardCharsets.UTF_8);
-            output.writeInt(msgBytes.length);
-            output.write(msgBytes);
-            output.flush();
-            LOG.debug("pipesClientId={}: list pipes iterators (placeholder)", 
pipesClientId);
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error listing pipes iterators", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-    
-    private void handleGetPipesIterator() {
-        try {
-            // Read iterator ID
-            int len = input.readInt();
-            byte[] bytes = new byte[len];
-            input.readFully(bytes);
-            String iteratorId = new String(bytes, StandardCharsets.UTF_8);
-            
-            // Get from ConfigStore
-            ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX + 
iteratorId);
-            
-            if (config == null) {
-                output.writeByte(0); // not found
-                output.flush();
-            } else {
-                output.writeByte(1); // found
-                
-                // Serialize config
-                UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-                try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-                    oos.writeObject(config);
-                }
-                byte[] configBytes = bos.toByteArray();
-                output.writeInt(configBytes.length);
-                output.write(configBytes);
-                output.flush();
-            }
-            LOG.debug("pipesClientId={}: get pipes iterator '{}' = {}", 
pipesClientId, iteratorId, (config != null ? "found" : "not found"));
-            
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: error getting pipes iterator", 
pipesClientId, e);
-            exit(1);
-        }
-    }
-    
-    private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:";
-
 }


Reply via email to