This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch TIKA-4595-dynamic-fetcher-management in repository https://gitbox.apache.org/repos/asf/tika.git
commit c48d6c1f9209b929ec66f916606cef2079e99bf2 Author: Nicholas DiPiazza <[email protected]> AuthorDate: Sun Dec 28 09:17:47 2025 -0600 TIKA-4595: Add dynamic fetcher/emitter management API to PipesClient - Added SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER commands - Added SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER commands - Implemented PipesClient public API methods for runtime configuration - Implemented PipesServer command handlers - Added deleteComponent() and getComponentConfig() to AbstractComponentManager - Added wrapper methods to FetcherManager and EmitterManager - Added remove() method to ConfigStore interface and implementations - All tests passing --- .../tika/pipes/core/AbstractComponentManager.java | 41 +++ .../org/apache/tika/pipes/core/PipesClient.java | 281 +++++++++++++++++++- .../apache/tika/pipes/core/config/ConfigStore.java | 9 + .../pipes/core/config/InMemoryConfigStore.java | 5 + .../tika/pipes/core/emitter/EmitterManager.java | 20 ++ .../tika/pipes/core/fetcher/FetcherManager.java | 20 ++ .../apache/tika/pipes/core/server/PipesServer.java | 286 +++++++++++++++++++++ .../tika/pipes/core/config/LoggingConfigStore.java | 8 + 8 files changed, 669 insertions(+), 1 deletion(-) diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java index 859d8dbb2..70de8b78b 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java @@ -309,6 +309,47 @@ public abstract class AbstractComponentManager<T extends TikaExtension, configStore.put(componentId, config); } + /** + * Deletes a component configuration by ID. + * Clears the cached instance and removes the configuration. + * + * @param componentId the component ID to delete + * @throws TikaConfigException if runtime modifications are not allowed or component not found + */ + public synchronized void deleteComponent(String componentId) throws TikaConfigException { + if (!allowRuntimeModifications) { + throw new TikaConfigException( + "Runtime modifications are not allowed. " + getClass().getSimpleName() + + " must be loaded with allowRuntimeModifications=true to use delete" + + getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()"); + } + + if (componentId == null) { + throw new IllegalArgumentException("Component ID cannot be null"); + } + + if (!configStore.containsKey(componentId)) { + throw new TikaConfigException( + getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + + getComponentName().substring(1) + " with ID '" + componentId + "' not found"); + } + + // Clear cache and remove config + componentCache.remove(componentId); + configStore.remove(componentId); + LOG.debug("Deleted {} config: id={}", getComponentName(), componentId); + } + + /** + * Gets the configuration for a specific component by ID. + * + * @param componentId the component ID + * @return the component configuration, or null if not found + */ + public ExtensionConfig getComponentConfig(String componentId) { + return configStore.get(componentId); + } + /** * Returns the set of supported component IDs. */ diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 151931bdb..5f386c628 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -40,10 +40,12 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.HashSet; import java.util.HexFormat; import java.util.List; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -55,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tika.config.TikaTaskTimeout; +import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; @@ -64,6 +67,7 @@ import org.apache.tika.pipes.api.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.server.IntermediateResult; import org.apache.tika.pipes.core.server.PipesServer; +import org.apache.tika.plugins.ExtensionConfig; import org.apache.tika.utils.ExceptionUtils; import org.apache.tika.utils.ProcessUtils; import org.apache.tika.utils.StringUtils; @@ -77,7 +81,9 @@ import org.apache.tika.utils.StringUtils; public class PipesClient implements Closeable { public enum COMMANDS { - PING, ACK, NEW_REQUEST, SHUT_DOWN; + PING, ACK, NEW_REQUEST, SHUT_DOWN, + SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER, + SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER; public byte getByte() { return (byte) (ordinal() + 1); @@ -602,4 +608,277 @@ public class PipesClient implements Closeable { return tikaTaskTimeout.getTimeoutMillis(); } + // ========== Fetcher Management API ========== + + /** + * Save (create or update) a fetcher configuration. + * The fetcher will be available immediately for use in subsequent fetch operations. + * + * @param config the fetcher configuration containing name, plugin ID, and parameters + * @throws IOException if communication with the server fails + * @throws TikaException if the server returns an error (e.g., invalid configuration) + * @throws InterruptedException if the operation is interrupted + */ + public void saveFetcher(ExtensionConfig config) throws IOException, TikaException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte()); + serverTuple.output.flush(); + + // Serialize the ExtensionConfig + UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(config); + } + byte[] bytes = bos.toByteArray(); + serverTuple.output.writeInt(bytes.length); + serverTuple.output.write(bytes); + serverTuple.output.flush(); + + // Read response + byte status = serverTuple.input.readByte(); + int msgLen = serverTuple.input.readInt(); + byte[] msgBytes = new byte[msgLen]; + serverTuple.input.readFully(msgBytes); + String message = new String(msgBytes, StandardCharsets.UTF_8); + + if (status != 0) { // 0 = success, 1 = error + throw new TikaException("Failed to save fetcher: " + message); + } + LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, config.id()); + } + } + + /** + * Delete a fetcher by its name/ID. + * + * @param fetcherId the fetcher name/ID to delete + * @throws IOException if communication with the server fails + * @throws TikaException if the server returns an error (e.g., fetcher not found) + * @throws InterruptedException if the operation is interrupted + */ + public void deleteFetcher(String fetcherId) throws IOException, TikaException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte()); + serverTuple.output.flush(); + + byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8); + serverTuple.output.writeInt(idBytes.length); + serverTuple.output.write(idBytes); + serverTuple.output.flush(); + + // Read response + byte status = serverTuple.input.readByte(); + int msgLen = serverTuple.input.readInt(); + byte[] msgBytes = new byte[msgLen]; + serverTuple.input.readFully(msgBytes); + String message = new String(msgBytes, StandardCharsets.UTF_8); + + if (status != 0) { + throw new TikaException("Failed to delete fetcher: " + message); + } + LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, fetcherId); + } + } + + /** + * List all available fetcher IDs (both static from config and dynamically added). + * + * @return set of fetcher IDs + * @throws IOException if communication with the server fails + * @throws InterruptedException if the operation is interrupted + */ + public Set<String> listFetchers() throws IOException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte()); + serverTuple.output.flush(); + + // Read response + int count = serverTuple.input.readInt(); + Set<String> fetcherIds = new HashSet<>(count); + for (int i = 0; i < count; i++) { + int len = serverTuple.input.readInt(); + byte[] bytes = new byte[len]; + serverTuple.input.readFully(bytes); + fetcherIds.add(new String(bytes, StandardCharsets.UTF_8)); + } + LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId, count); + return fetcherIds; + } + } + + /** + * Get the configuration for a specific fetcher. + * + * @param fetcherId the fetcher ID + * @return the fetcher configuration, or null if not found + * @throws IOException if communication with the server fails + * @throws InterruptedException if the operation is interrupted + */ + public ExtensionConfig getFetcherConfig(String fetcherId) throws IOException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.GET_FETCHER.getByte()); + serverTuple.output.flush(); + + byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8); + serverTuple.output.writeInt(idBytes.length); + serverTuple.output.write(idBytes); + serverTuple.output.flush(); + + // Read response + byte found = serverTuple.input.readByte(); + if (found == 0) { + return null; + } + + int len = serverTuple.input.readInt(); + byte[] bytes = new byte[len]; + serverTuple.input.readFully(bytes); + + try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { + return (ExtensionConfig) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to deserialize ExtensionConfig", e); + } + } + } + + // ========== Emitter Management API ========== + + /** + * Save (create or update) an emitter configuration. + * The emitter will be available immediately for use in subsequent emit operations. + * + * @param config the emitter configuration containing name, plugin ID, and parameters + * @throws IOException if communication with the server fails + * @throws TikaException if the server returns an error (e.g., invalid configuration) + * @throws InterruptedException if the operation is interrupted + */ + public void saveEmitter(ExtensionConfig config) throws IOException, TikaException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte()); + serverTuple.output.flush(); + + UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(config); + } + byte[] bytes = bos.toByteArray(); + serverTuple.output.writeInt(bytes.length); + serverTuple.output.write(bytes); + serverTuple.output.flush(); + + byte status = serverTuple.input.readByte(); + int msgLen = serverTuple.input.readInt(); + byte[] msgBytes = new byte[msgLen]; + serverTuple.input.readFully(msgBytes); + String message = new String(msgBytes, StandardCharsets.UTF_8); + + if (status != 0) { + throw new TikaException("Failed to save emitter: " + message); + } + LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, config.id()); + } + } + + /** + * Delete an emitter by its name/ID. + * + * @param emitterId the emitter name/ID to delete + * @throws IOException if communication with the server fails + * @throws TikaException if the server returns an error (e.g., emitter not found) + * @throws InterruptedException if the operation is interrupted + */ + public void deleteEmitter(String emitterId) throws IOException, TikaException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte()); + serverTuple.output.flush(); + + byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8); + serverTuple.output.writeInt(idBytes.length); + serverTuple.output.write(idBytes); + serverTuple.output.flush(); + + byte status = serverTuple.input.readByte(); + int msgLen = serverTuple.input.readInt(); + byte[] msgBytes = new byte[msgLen]; + serverTuple.input.readFully(msgBytes); + String message = new String(msgBytes, StandardCharsets.UTF_8); + + if (status != 0) { + throw new TikaException("Failed to delete emitter: " + message); + } + LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, emitterId); + } + } + + /** + * List all available emitter IDs (both static from config and dynamically added). + * + * @return set of emitter IDs + * @throws IOException if communication with the server fails + * @throws InterruptedException if the operation is interrupted + */ + public Set<String> listEmitters() throws IOException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte()); + serverTuple.output.flush(); + + int count = serverTuple.input.readInt(); + Set<String> emitterIds = new HashSet<>(count); + for (int i = 0; i < count; i++) { + int len = serverTuple.input.readInt(); + byte[] bytes = new byte[len]; + serverTuple.input.readFully(bytes); + emitterIds.add(new String(bytes, StandardCharsets.UTF_8)); + } + LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId, count); + return emitterIds; + } + } + + /** + * Get the configuration for a specific emitter. + * + * @param emitterId the emitter ID + * @return the emitter configuration, or null if not found + * @throws IOException if communication with the server fails + * @throws InterruptedException if the operation is interrupted + */ + public ExtensionConfig getEmitterConfig(String emitterId) throws IOException, InterruptedException { + maybeInit(); + synchronized (lock) { + serverTuple.output.write(COMMANDS.GET_EMITTER.getByte()); + serverTuple.output.flush(); + + byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8); + serverTuple.output.writeInt(idBytes.length); + serverTuple.output.write(idBytes); + serverTuple.output.flush(); + + byte found = serverTuple.input.readByte(); + if (found == 0) { + return null; + } + + int len = serverTuple.input.readInt(); + byte[] bytes = new byte[len]; + serverTuple.input.readFully(bytes); + + try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { + return (ExtensionConfig) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to deserialize ExtensionConfig", e); + } + } + } + + private final Object[] lock = new Object[0]; + } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java index 73d73ff7e..5c90fa0b7 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java @@ -88,4 +88,13 @@ public interface ConfigStore extends TikaExtension { * @return the number of configurations */ int size(); + + /** + * Removes a configuration by ID. + * + * @param id the configuration ID (must not be null) + * @return the removed configuration, or null if not found + * @throws NullPointerException if id is null + */ + ExtensionConfig remove(String id); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java index 7b6dab100..90a9c1ce6 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/InMemoryConfigStore.java @@ -63,4 +63,9 @@ public class InMemoryConfigStore implements ConfigStore { public int size() { return store.size(); } + + @Override + public ExtensionConfig remove(String id) { + return store.remove(id); + } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java index e424fc35a..256e35b7c 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java @@ -174,4 +174,24 @@ public class EmitterManager extends AbstractComponentManager<Emitter, EmitterFac public void saveEmitter(ExtensionConfig config) throws TikaConfigException, IOException { saveComponent(config); } + + /** + * Deletes an emitter configuration by ID. + * + * @param emitterId the emitter ID to delete + * @throws TikaConfigException if runtime modifications are not allowed or emitter not found + */ + public void deleteEmitter(String emitterId) throws TikaConfigException { + deleteComponent(emitterId); + } + + /** + * Gets the configuration for a specific emitter by ID. + * + * @param emitterId the emitter ID + * @return the emitter configuration, or null if not found + */ + public ExtensionConfig getConfig(String emitterId) { + return getComponentConfig(emitterId); + } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java index e282b8495..4f4e9a199 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java @@ -173,4 +173,24 @@ public class FetcherManager extends AbstractComponentManager<Fetcher, FetcherFac public void saveFetcher(ExtensionConfig config) throws TikaConfigException, IOException { saveComponent(config); } + + /** + * Deletes a fetcher configuration by ID. + * + * @param fetcherId the fetcher ID to delete + * @throws TikaConfigException if runtime modifications are not allowed or fetcher not found + */ + public void deleteFetcher(String fetcherId) throws TikaConfigException { + deleteComponent(fetcherId); + } + + /** + * Gets the configuration for a specific fetcher by ID. + * + * @param fetcherId the fetcher ID + * @return the fetcher configuration, or null if not found + */ + public ExtensionConfig getConfig(String fetcherId) { + return getComponentConfig(fetcherId); + } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index dd09db768..6715db1b6 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -38,6 +38,7 @@ import java.time.Duration; import java.time.Instant; import java.util.HexFormat; import java.util.Locale; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -72,6 +73,7 @@ import org.apache.tika.pipes.core.PipesClient; import org.apache.tika.pipes.core.PipesConfig; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; +import org.apache.tika.plugins.ExtensionConfig; import org.apache.tika.plugins.TikaPluginManager; import org.apache.tika.serialization.ParseContextUtils; import org.apache.tika.utils.ExceptionUtils; @@ -306,6 +308,22 @@ public class PipesServer implements AutoCloseable { //swallow } System.exit(0); + } else if (request == PipesClient.COMMANDS.SAVE_FETCHER.getByte()) { + handleSaveFetcher(); + } else if (request == PipesClient.COMMANDS.DELETE_FETCHER.getByte()) { + handleDeleteFetcher(); + } else if (request == PipesClient.COMMANDS.LIST_FETCHERS.getByte()) { + handleListFetchers(); + } else if (request == PipesClient.COMMANDS.GET_FETCHER.getByte()) { + handleGetFetcher(); + } else if (request == PipesClient.COMMANDS.SAVE_EMITTER.getByte()) { + handleSaveEmitter(); + } else if (request == PipesClient.COMMANDS.DELETE_EMITTER.getByte()) { + handleDeleteEmitter(); + } else if (request == PipesClient.COMMANDS.LIST_EMITTERS.getByte()) { + handleListEmitters(); + } else if (request == PipesClient.COMMANDS.GET_EMITTER.getByte()) { + handleGetEmitter(); } else { String msg = String.format(Locale.ROOT, "pipesClientId=%s: Unexpected byte 0x%02x in command position. " + @@ -556,4 +574,272 @@ public class PipesServer implements AutoCloseable { } } + // ========== Fetcher Management Handlers ========== + + private void handleSaveFetcher() { + try { + // Read ExtensionConfig + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + + ExtensionConfig config; + try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { + config = (ExtensionConfig) ois.readObject(); + } + + // Save the fetcher + fetcherManager.saveFetcher(config); + LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId, config.id()); + + // Send success response + output.writeByte(0); // success + String msg = "Fetcher saved successfully"; + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + + } catch (Exception e) { + LOG.error("pipesClientId={}: error saving fetcher", pipesClientId, e); + try { + output.writeByte(1); // error + String msg = ExceptionUtils.getStackTrace(e); + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + } catch (IOException ioe) { + LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); + exit(1); + } + } + } + + private void handleDeleteFetcher() { + try { + // Read fetcher ID + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + String fetcherId = new String(bytes, StandardCharsets.UTF_8); + + // Delete the fetcher + fetcherManager.deleteFetcher(fetcherId); + LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId, fetcherId); + + // Send success response + output.writeByte(0); // success + String msg = "Fetcher deleted successfully"; + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + + } catch (Exception e) { + LOG.error("pipesClientId={}: error deleting fetcher", pipesClientId, e); + try { + output.writeByte(1); // error + String msg = ExceptionUtils.getStackTrace(e); + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + } catch (IOException ioe) { + LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); + exit(1); + } + } + } + + private void handleListFetchers() { + try { + // Get list of fetcher IDs + Set<String> fetcherIds = fetcherManager.getSupported(); + LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId, fetcherIds.size()); + + // Send response + output.writeInt(fetcherIds.size()); + for (String id : fetcherIds) { + byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); + output.writeInt(idBytes.length); + output.write(idBytes); + } + output.flush(); + + } catch (IOException e) { + LOG.error("pipesClientId={}: error listing fetchers", pipesClientId, e); + exit(1); + } + } + + private void handleGetFetcher() { + try { + // Read fetcher ID + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + String fetcherId = new String(bytes, StandardCharsets.UTF_8); + + // Get fetcher config + ExtensionConfig config = fetcherManager.getConfig(fetcherId); + + if (config == null) { + output.writeByte(0); // not found + output.flush(); + } else { + output.writeByte(1); // found + + // Serialize config + UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(config); + } + byte[] configBytes = bos.toByteArray(); + output.writeInt(configBytes.length); + output.write(configBytes); + output.flush(); + } + LOG.debug("pipesClientId={}: get fetcher '{}' = {}", pipesClientId, fetcherId, (config != null ? "found" : "not found")); + + } catch (IOException e) { + LOG.error("pipesClientId={}: error getting fetcher", pipesClientId, e); + exit(1); + } + } + + // ========== Emitter Management Handlers ========== + + private void handleSaveEmitter() { + try { + // Read ExtensionConfig + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + + ExtensionConfig config; + try (ObjectInputStream ois = new ObjectInputStream(new UnsynchronizedByteArrayInputStream(bytes))) { + config = (ExtensionConfig) ois.readObject(); + } + + // Save the emitter + emitterManager.saveEmitter(config); + LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId, config.id()); + + // Send success response + output.writeByte(0); // success + String msg = "Emitter saved successfully"; + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + + } catch (Exception e) { + LOG.error("pipesClientId={}: error saving emitter", pipesClientId, e); + try { + output.writeByte(1); // error + String msg = ExceptionUtils.getStackTrace(e); + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + } catch (IOException ioe) { + LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); + exit(1); + } + } + } + + private void handleDeleteEmitter() { + try { + // Read emitter ID + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + String emitterId = new String(bytes, StandardCharsets.UTF_8); + + // Delete the emitter + emitterManager.deleteEmitter(emitterId); + LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId, emitterId); + + // Send success response + output.writeByte(0); // success + String msg = "Emitter deleted successfully"; + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + + } catch (Exception e) { + LOG.error("pipesClientId={}: error deleting emitter", pipesClientId, e); + try { + output.writeByte(1); // error + String msg = ExceptionUtils.getStackTrace(e); + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + output.writeInt(msgBytes.length); + output.write(msgBytes); + output.flush(); + } catch (IOException ioe) { + LOG.error("pipesClientId={}: failed to send error response", pipesClientId, ioe); + exit(1); + } + } + } + + private void handleListEmitters() { + try { + // Get list of emitter IDs + Set<String> emitterIds = emitterManager.getSupported(); + LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId, emitterIds.size()); + + // Send response + output.writeInt(emitterIds.size()); + for (String id : emitterIds) { + byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); + output.writeInt(idBytes.length); + output.write(idBytes); + } + output.flush(); + + } catch (IOException e) { + LOG.error("pipesClientId={}: error listing emitters", pipesClientId, e); + exit(1); + } + } + + private void handleGetEmitter() { + try { + // Read emitter ID + int len = input.readInt(); + byte[] bytes = new byte[len]; + input.readFully(bytes); + String emitterId = new String(bytes, StandardCharsets.UTF_8); + + // Get emitter config + ExtensionConfig config = emitterManager.getConfig(emitterId); + + if (config == null) { + output.writeByte(0); // not found + output.flush(); + } else { + output.writeByte(1); // found + + // Serialize config + UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(config); + } + byte[] configBytes = bos.toByteArray(); + output.writeInt(configBytes.length); + output.write(configBytes); + output.flush(); + } + LOG.debug("pipesClientId={}: get emitter '{}' = {}", pipesClientId, emitterId, (config != null ? "found" : "not found")); + + } catch (IOException e) { + LOG.error("pipesClientId={}: error getting emitter", pipesClientId, e); + exit(1); + } + } + } diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java index 8ef299b35..07e901e2e 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/config/LoggingConfigStore.java @@ -83,4 +83,12 @@ public class LoggingConfigStore implements ConfigStore { return store.size(); } } + + @Override + public ExtensionConfig remove(String id) { + LOG.debug("ConfigStore: Removing config with id={}", id); + synchronized (store) { + return store.remove(id); + } + } }
