This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4616 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 432875766ed0f603f94e4ce0de793fb0ff4955bd Author: tallison <[email protected]> AuthorDate: Thu Jan 8 20:03:19 2026 -0500 TIKA-4616 -- mv fetcher/emitter CRUD to tika-grpc --- .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 58 ++- .../org/apache/tika/pipes/core/PipesClient.java | 394 +------------------ .../apache/tika/pipes/core/server/PipesServer.java | 426 --------------------- 3 files changed, 29 insertions(+), 849 deletions(-) diff --git a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java index 8e5bfe2676..a21c32a77e 100644 --- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java +++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java @@ -81,9 +81,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { .build(); public static final JsonSchemaGenerator JSON_SCHEMA_GENERATOR = new JsonSchemaGenerator(OBJECT_MAPPER); + private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:"; + PipesConfig pipesConfig; PipesClient pipesClient; FetcherManager fetcherManager; + ConfigStore configStore; Path tikaConfigPath; PluginManager pluginManager; @@ -125,9 +128,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { pluginManager = new org.pf4j.DefaultPluginManager(); } - ConfigStore configStore = createConfigStore(); + this.configStore = createConfigStore(); - fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig, true, configStore); + fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig, true, this.configStore); } private ConfigStore createConfigStore() throws TikaConfigException { @@ -261,11 +264,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { String factoryName = findFactoryNameForClass(request.getFetcherClass()); ExtensionConfig config = new ExtensionConfig(request.getFetcherId(), factoryName, request.getFetcherConfigJson()); - // Save to gRPC server's fetcher manager (for schema queries, etc.) + // Save to fetcher manager (updates ConfigStore which is shared with PipesServer) fetcherManager.saveFetcher(config); - - // Also save to PipesClient so it propagates to the forked PipesServer - pipesClient.saveFetcher(config); } catch (Exception e) { throw new RuntimeException(e); } @@ -372,12 +372,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { private boolean deleteFetcher(String id) { try { - // Delete from gRPC server's fetcher manager + // Delete from fetcher manager (updates ConfigStore which is shared with PipesServer) fetcherManager.deleteFetcher(id); - - // Also delete from PipesClient so it propagates to the forked PipesServer - pipesClient.deleteFetcher(id); - LOG.info("Successfully deleted fetcher: {}", id); return true; } catch (Exception e) { @@ -395,22 +391,22 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { String iteratorId = request.getIteratorId(); String iteratorClass = request.getIteratorClass(); String iteratorConfigJson = request.getIteratorConfigJson(); - + LOG.info("Saving pipes iterator: id={}, class={}", iteratorId, iteratorClass); - + ExtensionConfig config = new ExtensionConfig(iteratorId, iteratorClass, iteratorConfigJson); - - // Save via PipesClient so it propagates to the forked PipesServer - pipesClient.savePipesIterator(config); - + + // Save directly to ConfigStore (shared with PipesServer) + configStore.put(PIPES_ITERATOR_PREFIX + iteratorId, config); + SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder() .setMessage("Pipes iterator saved successfully") .build(); responseObserver.onNext(reply); responseObserver.onCompleted(); - + LOG.info("Successfully saved pipes iterator: {}", iteratorId); - + } catch (Exception e) { LOG.error("Failed to save pipes iterator", e); responseObserver.onError(io.grpc.Status.INTERNAL @@ -426,16 +422,17 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { try { String iteratorId = request.getIteratorId(); LOG.info("Getting pipes iterator: {}", iteratorId); - - ExtensionConfig config = pipesClient.getPipesIteratorConfig(iteratorId); - + + // Get directly from ConfigStore (shared with PipesServer) + ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX + iteratorId); + if (config == null) { responseObserver.onError(io.grpc.Status.NOT_FOUND .withDescription("Pipes iterator not found: " + iteratorId) .asRuntimeException()); return; } - + GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder() .setIteratorId(config.id()) .setIteratorClass(config.name()) @@ -443,9 +440,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { .build(); responseObserver.onNext(reply); responseObserver.onCompleted(); - + LOG.info("Successfully retrieved pipes iterator: {}", iteratorId); - + } catch (Exception e) { LOG.error("Failed to get pipes iterator", e); responseObserver.onError(io.grpc.Status.INTERNAL @@ -461,17 +458,18 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { try { String iteratorId = request.getIteratorId(); LOG.info("Deleting pipes iterator: {}", iteratorId); - - pipesClient.deletePipesIterator(iteratorId); - + + // Delete directly from ConfigStore (shared with PipesServer) + configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId); + DeletePipesIteratorReply reply = DeletePipesIteratorReply.newBuilder() .setMessage("Pipes iterator deleted successfully") .build(); responseObserver.onNext(reply); responseObserver.onCompleted(); - + LOG.info("Successfully deleted pipes iterator: {}", iteratorId); - + } catch (Exception e) { LOG.error("Failed to delete pipes iterator", e); responseObserver.onError(io.grpc.Status.INTERNAL diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 5d1ef5bdf7..fda74a01c3 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -40,12 +40,10 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.HashSet; import java.util.HexFormat; import java.util.List; import java.util.Locale; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -57,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tika.config.TikaTaskTimeout; -import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; @@ -67,7 +64,6 @@ import org.apache.tika.pipes.api.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.server.IntermediateResult; import org.apache.tika.pipes.core.server.PipesServer; -import org.apache.tika.plugins.ExtensionConfig; import org.apache.tika.utils.ExceptionUtils; import org.apache.tika.utils.ProcessUtils; import org.apache.tika.utils.StringUtils; @@ -81,10 +77,7 @@ import org.apache.tika.utils.StringUtils; public class PipesClient implements Closeable { public enum COMMANDS { - PING, ACK, NEW_REQUEST, SHUT_DOWN, - SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER, - SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER, - SAVE_PIPES_ITERATOR, DELETE_PIPES_ITERATOR, LIST_PIPES_ITERATORS, GET_PIPES_ITERATOR; + PING, ACK, NEW_REQUEST, SHUT_DOWN; public byte getByte() { return (byte) (ordinal() + 1); @@ -608,389 +601,4 @@ public class PipesClient implements Closeable { LOG.debug("applying timeout from parseContext: {}ms", tikaTaskTimeout.getTimeoutMillis()); return tikaTaskTimeout.getTimeoutMillis(); } - - // ========== Fetcher Management API ========== - - /** - * Save (create or update) a fetcher configuration. - * The fetcher will be available immediately for use in subsequent fetch operations. - * - * @param config the fetcher configuration containing name, plugin ID, and parameters - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., invalid configuration) - * @throws InterruptedException if the operation is interrupted - */ - public void saveFetcher(ExtensionConfig config) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte()); - serverTuple.output.flush(); - - // Serialize the ExtensionConfig - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] bytes = bos.toByteArray(); - serverTuple.output.writeInt(bytes.length); - serverTuple.output.write(bytes); - serverTuple.output.flush(); - - // Read response - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { // 0 = success, 1 = error - throw new TikaException("Failed to save fetcher: " + message); - } - LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, config.id()); - } - } - - /** - * Delete a fetcher by its name/ID. - * - * @param fetcherId the fetcher name/ID to delete - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., fetcher not found) - * @throws InterruptedException if the operation is interrupted - */ - public void deleteFetcher(String fetcherId) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - // Read response - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { - throw new TikaException("Failed to delete fetcher: " + message); - } - LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, fetcherId); - } - } - - /** - * List all available fetcher IDs (both static from config and dynamically added). - * - * @return set of fetcher IDs - * @throws IOException if communication with the server fails - * @throws InterruptedException if the operation is interrupted - */ - public Set<String> listFetchers() throws IOException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte()); - serverTuple.output.flush(); - - // Read response - int count = serverTuple.input.readInt(); - Set<String> fetcherIds = new HashSet<>(count); - for (int i = 0; i < count; i++) { - int len = serverTuple.input.readInt(); - byte[] bytes = new byte[len]; - serverTuple.input.readFully(bytes); - fetcherIds.add(new String(bytes, StandardCharsets.UTF_8)); - } - LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, count); - return fetcherIds; - } - } - - /** - * Get the configuration for a specific fetcher. - * - * @param fetcherId the fetcher ID - * @return the fetcher configuration, or null if not found - * @throws IOException if communication with the server fails - * @throws InterruptedException if the operation is interrupted - */ - public ExtensionConfig getFetcherConfig(String fetcherId) throws IOException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.GET_FETCHER.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - // Read response - byte found = serverTuple.input.readByte(); - if (found == 0) { - return null; - } - - int len = serverTuple.input.readInt(); - byte[] bytes = new byte[len]; - serverTuple.input.readFully(bytes); - - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - return (ExtensionConfig) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to deserialize ExtensionConfig", e); - } - } - } - - // ========== Emitter Management API ========== - - /** - * Save (create or update) an emitter configuration. - * The emitter will be available immediately for use in subsequent emit operations. - * - * @param config the emitter configuration containing name, plugin ID, and parameters - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., invalid configuration) - * @throws InterruptedException if the operation is interrupted - */ - public void saveEmitter(ExtensionConfig config) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte()); - serverTuple.output.flush(); - - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] bytes = bos.toByteArray(); - serverTuple.output.writeInt(bytes.length); - serverTuple.output.write(bytes); - serverTuple.output.flush(); - - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { - throw new TikaException("Failed to save emitter: " + message); - } - LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, config.id()); - } - } - - /** - * Delete an emitter by its name/ID. - * - * @param emitterId the emitter name/ID to delete - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., emitter not found) - * @throws InterruptedException if the operation is interrupted - */ - public void deleteEmitter(String emitterId) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { - throw new TikaException("Failed to delete emitter: " + message); - } - LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, emitterId); - } - } - - /** - * List all available emitter IDs (both static from config and dynamically added). - * - * @return set of emitter IDs - * @throws IOException if communication with the server fails - * @throws InterruptedException if the operation is interrupted - */ - public Set<String> listEmitters() throws IOException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte()); - serverTuple.output.flush(); - - int count = serverTuple.input.readInt(); - Set<String> emitterIds = new HashSet<>(count); - for (int i = 0; i < count; i++) { - int len = serverTuple.input.readInt(); - byte[] bytes = new byte[len]; - serverTuple.input.readFully(bytes); - emitterIds.add(new String(bytes, StandardCharsets.UTF_8)); - } - LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, count); - return emitterIds; - } - } - - /** - * Get the configuration for a specific emitter. - * - * @param emitterId the emitter ID - * @return the emitter configuration, or null if not found - * @throws IOException if communication with the server fails - * @throws InterruptedException if the operation is interrupted - */ - public ExtensionConfig getEmitterConfig(String emitterId) throws IOException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.GET_EMITTER.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - byte found = serverTuple.input.readByte(); - if (found == 0) { - return null; - } - - int len = serverTuple.input.readInt(); - byte[] bytes = new byte[len]; - serverTuple.input.readFully(bytes); - - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - return (ExtensionConfig) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to deserialize ExtensionConfig", e); - } - } - } - - // ========== PipesIterator Management API ========== - - /** - * Save (create or update) a PipesIterator configuration. - * The iterator will be available immediately for use in subsequent operations. - * - * @param config the iterator configuration containing name, plugin ID, and parameters - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., invalid configuration) - * @throws InterruptedException if the operation is interrupted - */ - public void savePipesIterator(ExtensionConfig config) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.SAVE_PIPES_ITERATOR.getByte()); - serverTuple.output.flush(); - - // Serialize the ExtensionConfig - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] bytes = bos.toByteArray(); - serverTuple.output.writeInt(bytes.length); - serverTuple.output.write(bytes); - serverTuple.output.flush(); - - // Read response - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { // 0 = success, 1 = error - throw new TikaException("Failed to save pipes iterator: " + message); - } - LOG.debug("pipesClientId={}: saved pipes iterator '{}'", pipesClientId, config.id()); - } - } - - /** - * Delete a PipesIterator configuration by its ID. - * - * @param iteratorId the iterator ID to delete - * @throws IOException if communication with the server fails - * @throws TikaException if the server returns an error (e.g., iterator not found) - * @throws InterruptedException if the operation is interrupted - */ - public void deletePipesIterator(String iteratorId) throws IOException, TikaException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.DELETE_PIPES_ITERATOR.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - // Read response - byte status = serverTuple.input.readByte(); - int msgLen = serverTuple.input.readInt(); - byte[] msgBytes = new byte[msgLen]; - serverTuple.input.readFully(msgBytes); - String message = new String(msgBytes, StandardCharsets.UTF_8); - - if (status != 0) { - throw new TikaException("Failed to delete pipes iterator: " + message); - } - LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", pipesClientId, iteratorId); - } - } - - /** - * Get the configuration for a specific PipesIterator. - * - * @param iteratorId the iterator ID - * @return the iterator configuration, or null if not found - * @throws IOException if communication with the server fails - * @throws InterruptedException if the operation is interrupted - */ - public ExtensionConfig getPipesIteratorConfig(String iteratorId) throws IOException, InterruptedException { - maybeInit(); - synchronized (lock) { - serverTuple.output.write(COMMANDS.GET_PIPES_ITERATOR.getByte()); - serverTuple.output.flush(); - - byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8); - serverTuple.output.writeInt(idBytes.length); - serverTuple.output.write(idBytes); - serverTuple.output.flush(); - - // Read response - byte found = serverTuple.input.readByte(); - if (found == 0) { - return null; - } - - int len = serverTuple.input.readInt(); - byte[] bytes = new byte[len]; - serverTuple.input.readFully(bytes); - - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - return (ExtensionConfig) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to deserialize ExtensionConfig", e); - } - } - } - - private final Object[] lock = new Object[0]; - } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 8c04a110c0..66d5d9ae50 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -38,7 +38,6 @@ import java.time.Duration; import java.time.Instant; import java.util.HexFormat; import java.util.Locale; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -315,30 +314,6 @@ public class PipesServer implements AutoCloseable { //swallow } System.exit(0); - } else if (request == PipesClient.COMMANDS.SAVE_FETCHER.getByte()) { - handleSaveFetcher(); - } else if (request == PipesClient.COMMANDS.DELETE_FETCHER.getByte()) { - handleDeleteFetcher(); - } else if (request == PipesClient.COMMANDS.LIST_FETCHERS.getByte()) { - handleListFetchers(); - } else if (request == PipesClient.COMMANDS.GET_FETCHER.getByte()) { - handleGetFetcher(); - } else if (request == PipesClient.COMMANDS.SAVE_EMITTER.getByte()) { - handleSaveEmitter(); - } else if (request == PipesClient.COMMANDS.DELETE_EMITTER.getByte()) { - handleDeleteEmitter(); - } else if (request == PipesClient.COMMANDS.LIST_EMITTERS.getByte()) { - handleListEmitters(); - } else if (request == PipesClient.COMMANDS.GET_EMITTER.getByte()) { - handleGetEmitter(); - } else if (request == PipesClient.COMMANDS.SAVE_PIPES_ITERATOR.getByte()) { - handleSavePipesIterator(); - } else if (request == PipesClient.COMMANDS.DELETE_PIPES_ITERATOR.getByte()) { - handleDeletePipesIterator(); - } else if (request == PipesClient.COMMANDS.LIST_PIPES_ITERATORS.getByte()) { - handleListPipesIterators(); - } else if (request == PipesClient.COMMANDS.GET_PIPES_ITERATOR.getByte()) { - handleGetPipesIterator(); } else { String msg = String.format(Locale.ROOT, "pipesClientId=%s: Unexpected byte 0x%02x in command position. " + @@ -609,405 +584,4 @@ public class PipesServer implements AutoCloseable { exit(1); } } - - // ========== Fetcher Management Handlers ========== - - private void handleSaveFetcher() { - try { - // Read ExtensionConfig - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - - ExtensionConfig config; - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - config = (ExtensionConfig) ois.readObject(); - } - - // Save the fetcher - fetcherManager.saveFetcher(config); - LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, config.id()); - - // Send success response - output.writeByte(0); // success - String msg = "Fetcher saved successfully"; - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error saving fetcher", pipesClientId, e); - try { - output.writeByte(1); // error - String msg = ExceptionUtils.getStackTrace(e); - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioe) { - LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); - exit(1); - } - } - } - - private void handleDeleteFetcher() { - try { - // Read fetcher ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String fetcherId = new String(bytes, StandardCharsets.UTF_8); - - // Delete the fetcher - fetcherManager.deleteFetcher(fetcherId); - LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, fetcherId); - - // Send success response - output.writeByte(0); // success - String msg = "Fetcher deleted successfully"; - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error deleting fetcher", pipesClientId, e); - try { - output.writeByte(1); // error - String msg = ExceptionUtils.getStackTrace(e); - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioe) { - LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); - exit(1); - } - } - } - - private void handleListFetchers() { - try { - // Get list of fetcher IDs - Set<String> fetcherIds = fetcherManager.getSupported(); - LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId, fetcherIds.size()); - - // Send response - output.writeInt(fetcherIds.size()); - for (String id : fetcherIds) { - byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); - output.writeInt(idBytes.length); - output.write(idBytes); - } - output.flush(); - - } catch (IOException e) { - LOG.error("pipesClientId={}: error listing fetchers", pipesClientId, e); - exit(1); - } - } - - private void handleGetFetcher() { - try { - // Read fetcher ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String fetcherId = new String(bytes, StandardCharsets.UTF_8); - - // Get fetcher config - ExtensionConfig config = fetcherManager.getConfig(fetcherId); - - if (config == null) { - output.writeByte(0); // not found - output.flush(); - } else { - output.writeByte(1); // found - - // Serialize config - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] configBytes = bos.toByteArray(); - output.writeInt(configBytes.length); - output.write(configBytes); - output.flush(); - } - LOG.debug("pipesClientId={}: get fetcher '{}' = {}", pipesClientId, fetcherId, (config != null ? "found" : "not found")); - - } catch (IOException e) { - LOG.error("pipesClientId={}: error getting fetcher", pipesClientId, e); - exit(1); - } - } - - // ========== Emitter Management Handlers ========== - - private void handleSaveEmitter() { - try { - // Read ExtensionConfig - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - - ExtensionConfig config; - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - config = (ExtensionConfig) ois.readObject(); - } - - // Save the emitter - emitterManager.saveEmitter(config); - LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, config.id()); - - // Send success response - output.writeByte(0); // success - String msg = "Emitter saved successfully"; - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error saving emitter", pipesClientId, e); - try { - output.writeByte(1); // error - String msg = ExceptionUtils.getStackTrace(e); - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioe) { - LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); - exit(1); - } - } - } - - private void handleDeleteEmitter() { - try { - // Read emitter ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String emitterId = new String(bytes, StandardCharsets.UTF_8); - - // Delete the emitter - emitterManager.deleteEmitter(emitterId); - LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, emitterId); - - // Send success response - output.writeByte(0); // success - String msg = "Emitter deleted successfully"; - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error deleting emitter", pipesClientId, e); - try { - output.writeByte(1); // error - String msg = ExceptionUtils.getStackTrace(e); - byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioe) { - LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); - exit(1); - } - } - } - - private void handleListEmitters() { - try { - // Get list of emitter IDs - Set<String> emitterIds = emitterManager.getSupported(); - LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId, emitterIds.size()); - - // Send response - output.writeInt(emitterIds.size()); - for (String id : emitterIds) { - byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); - output.writeInt(idBytes.length); - output.write(idBytes); - } - output.flush(); - - } catch (IOException e) { - LOG.error("pipesClientId={}: error listing emitters", pipesClientId, e); - exit(1); - } - } - - private void handleGetEmitter() { - try { - // Read emitter ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String emitterId = new String(bytes, StandardCharsets.UTF_8); - - // Get emitter config - ExtensionConfig config = emitterManager.getConfig(emitterId); - - if (config == null) { - output.writeByte(0); // not found - output.flush(); - } else { - output.writeByte(1); // found - - // Serialize config - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] configBytes = bos.toByteArray(); - output.writeInt(configBytes.length); - output.write(configBytes); - output.flush(); - } - LOG.debug("pipesClientId={}: get emitter '{}' = {}", pipesClientId, emitterId, (config != null ? "found" : "not found")); - - } catch (IOException e) { - LOG.error("pipesClientId={}: error getting emitter", pipesClientId, e); - exit(1); - } - } - - // ========== PipesIterator Command Handlers ========== - // Note: PipesIterators are primarily used on the client side to generate FetchEmitTuples. - // Unlike Fetchers and Emitters, they are not component managers in PipesServer. - // These handlers provide basic ConfigStore operations for consistency. - - private void handleSavePipesIterator() { - try { - // Read ExtensionConfig - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - - ExtensionConfig config; - try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { - config = (ExtensionConfig) ois.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to deserialize ExtensionConfig", e); - } - - // Save to ConfigStore - configStore.put(PIPES_ITERATOR_PREFIX + config.id(), config); - - // Send success response - output.writeByte(0); // success - byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - LOG.debug("pipesClientId={}: saved pipes iterator '{}'", pipesClientId, config.id()); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error saving pipes iterator", pipesClientId, e); - try { - output.writeByte(1); // error - byte[] msgBytes = e.getMessage().getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioException) { - LOG.error("pipesClientId={}: error sending error response", pipesClientId, ioException); - } - } - } - - private void handleDeletePipesIterator() { - try { - // Read iterator ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String iteratorId = new String(bytes, StandardCharsets.UTF_8); - - // Delete from ConfigStore - configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId); - - // Send success response - output.writeByte(0); // success - byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - - LOG.debug("pipesClientId={}: deleted pipes iterator '{}'", pipesClientId, iteratorId); - - } catch (Exception e) { - LOG.error("pipesClientId={}: error deleting pipes iterator", pipesClientId, e); - try { - output.writeByte(1); // error - byte[] msgBytes = e.getMessage().getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - } catch (IOException ioException) { - LOG.error("pipesClientId={}: error sending error response", pipesClientId, ioException); - } - } - } - - private void handleListPipesIterators() { - try { - // This is a placeholder - list operation not fully implemented - // Would need to iterate ConfigStore keys with PIPES_ITERATOR_PREFIX - output.writeByte(0); // success - byte[] msgBytes = "[]".getBytes(StandardCharsets.UTF_8); - output.writeInt(msgBytes.length); - output.write(msgBytes); - output.flush(); - LOG.debug("pipesClientId={}: list pipes iterators (placeholder)", pipesClientId); - } catch (IOException e) { - LOG.error("pipesClientId={}: error listing pipes iterators", pipesClientId, e); - exit(1); - } - } - - private void handleGetPipesIterator() { - try { - // Read iterator ID - int len = input.readInt(); - byte[] bytes = new byte[len]; - input.readFully(bytes); - String iteratorId = new String(bytes, StandardCharsets.UTF_8); - - // Get from ConfigStore - ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX + iteratorId); - - if (config == null) { - output.writeByte(0); // not found - output.flush(); - } else { - output.writeByte(1); // found - - // Serialize config - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(config); - } - byte[] configBytes = bos.toByteArray(); - output.writeInt(configBytes.length); - output.write(configBytes); - output.flush(); - } - LOG.debug("pipesClientId={}: get pipes iterator '{}' = {}", pipesClientId, iteratorId, (config != null ? "found" : "not found")); - - } catch (IOException e) { - LOG.error("pipesClientId={}: error getting pipes iterator", pipesClientId, e); - exit(1); - } - } - - private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:"; - }
