This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new c43f000033 TIKA-4616 -- mv fetcher/emitter CRUD to tika-grpc (#2519)
c43f000033 is described below
commit c43f0000339cf326e9a3387d4bccc2a8dc406132
Author: Tim Allison <[email protected]>
AuthorDate: Wed Jan 14 08:11:28 2026 -0500
TIKA-4616 -- mv fetcher/emitter CRUD to tika-grpc (#2519)
---
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 58 ++-
.../org/apache/tika/pipes/core/PipesClient.java | 394 +------------------
.../apache/tika/pipes/core/server/PipesServer.java | 426 ---------------------
3 files changed, 29 insertions(+), 849 deletions(-)
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 8e5bfe2676..a21c32a77e 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -81,9 +81,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
.build();
public static final JsonSchemaGenerator JSON_SCHEMA_GENERATOR = new
JsonSchemaGenerator(OBJECT_MAPPER);
+ private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:";
+
PipesConfig pipesConfig;
PipesClient pipesClient;
FetcherManager fetcherManager;
+ ConfigStore configStore;
Path tikaConfigPath;
PluginManager pluginManager;
@@ -125,9 +128,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
pluginManager = new org.pf4j.DefaultPluginManager();
}
- ConfigStore configStore = createConfigStore();
+ this.configStore = createConfigStore();
- fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig,
true, configStore);
+ fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig,
true, this.configStore);
}
private ConfigStore createConfigStore() throws TikaConfigException {
@@ -261,11 +264,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
String factoryName =
findFactoryNameForClass(request.getFetcherClass());
ExtensionConfig config = new
ExtensionConfig(request.getFetcherId(), factoryName,
request.getFetcherConfigJson());
- // Save to gRPC server's fetcher manager (for schema queries, etc.)
+ // Save to fetcher manager (updates ConfigStore which is shared
with PipesServer)
fetcherManager.saveFetcher(config);
-
- // Also save to PipesClient so it propagates to the forked
PipesServer
- pipesClient.saveFetcher(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -372,12 +372,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
private boolean deleteFetcher(String id) {
try {
- // Delete from gRPC server's fetcher manager
+ // Delete from fetcher manager (updates ConfigStore which is
shared with PipesServer)
fetcherManager.deleteFetcher(id);
-
- // Also delete from PipesClient so it propagates to the forked
PipesServer
- pipesClient.deleteFetcher(id);
-
LOG.info("Successfully deleted fetcher: {}", id);
return true;
} catch (Exception e) {
@@ -395,22 +391,22 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
String iteratorId = request.getIteratorId();
String iteratorClass = request.getIteratorClass();
String iteratorConfigJson = request.getIteratorConfigJson();
-
+
LOG.info("Saving pipes iterator: id={}, class={}", iteratorId,
iteratorClass);
-
+
ExtensionConfig config = new ExtensionConfig(iteratorId,
iteratorClass, iteratorConfigJson);
-
- // Save via PipesClient so it propagates to the forked PipesServer
- pipesClient.savePipesIterator(config);
-
+
+ // Save directly to ConfigStore (shared with PipesServer)
+ configStore.put(PIPES_ITERATOR_PREFIX + iteratorId, config);
+
SavePipesIteratorReply reply = SavePipesIteratorReply.newBuilder()
.setMessage("Pipes iterator saved successfully")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
-
+
LOG.info("Successfully saved pipes iterator: {}", iteratorId);
-
+
} catch (Exception e) {
LOG.error("Failed to save pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
@@ -426,16 +422,17 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
try {
String iteratorId = request.getIteratorId();
LOG.info("Getting pipes iterator: {}", iteratorId);
-
- ExtensionConfig config =
pipesClient.getPipesIteratorConfig(iteratorId);
-
+
+ // Get directly from ConfigStore (shared with PipesServer)
+ ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX +
iteratorId);
+
if (config == null) {
responseObserver.onError(io.grpc.Status.NOT_FOUND
.withDescription("Pipes iterator not found: " +
iteratorId)
.asRuntimeException());
return;
}
-
+
GetPipesIteratorReply reply = GetPipesIteratorReply.newBuilder()
.setIteratorId(config.id())
.setIteratorClass(config.name())
@@ -443,9 +440,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
-
+
LOG.info("Successfully retrieved pipes iterator: {}", iteratorId);
-
+
} catch (Exception e) {
LOG.error("Failed to get pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
@@ -461,17 +458,18 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
try {
String iteratorId = request.getIteratorId();
LOG.info("Deleting pipes iterator: {}", iteratorId);
-
- pipesClient.deletePipesIterator(iteratorId);
-
+
+ // Delete directly from ConfigStore (shared with PipesServer)
+ configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId);
+
DeletePipesIteratorReply reply =
DeletePipesIteratorReply.newBuilder()
.setMessage("Pipes iterator deleted successfully")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
-
+
LOG.info("Successfully deleted pipes iterator: {}", iteratorId);
-
+
} catch (Exception e) {
LOG.error("Failed to delete pipes iterator", e);
responseObserver.onError(io.grpc.Status.INTERNAL
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 5d1ef5bdf7..fda74a01c3 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -40,12 +40,10 @@ import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.config.TikaTaskTimeout;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.ParseContext;
@@ -67,7 +64,6 @@ import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
import org.apache.tika.pipes.core.server.IntermediateResult;
import org.apache.tika.pipes.core.server.PipesServer;
-import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.ProcessUtils;
import org.apache.tika.utils.StringUtils;
@@ -81,10 +77,7 @@ import org.apache.tika.utils.StringUtils;
public class PipesClient implements Closeable {
public enum COMMANDS {
- PING, ACK, NEW_REQUEST, SHUT_DOWN,
- SAVE_FETCHER, DELETE_FETCHER, LIST_FETCHERS, GET_FETCHER,
- SAVE_EMITTER, DELETE_EMITTER, LIST_EMITTERS, GET_EMITTER,
- SAVE_PIPES_ITERATOR, DELETE_PIPES_ITERATOR, LIST_PIPES_ITERATORS,
GET_PIPES_ITERATOR;
+ PING, ACK, NEW_REQUEST, SHUT_DOWN;
public byte getByte() {
return (byte) (ordinal() + 1);
@@ -608,389 +601,4 @@ public class PipesClient implements Closeable {
LOG.debug("applying timeout from parseContext: {}ms",
tikaTaskTimeout.getTimeoutMillis());
return tikaTaskTimeout.getTimeoutMillis();
}
-
- // ========== Fetcher Management API ==========
-
- /**
- * Save (create or update) a fetcher configuration.
- * The fetcher will be available immediately for use in subsequent fetch
operations.
- *
- * @param config the fetcher configuration containing name, plugin ID, and
parameters
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., invalid
configuration)
- * @throws InterruptedException if the operation is interrupted
- */
- public void saveFetcher(ExtensionConfig config) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.SAVE_FETCHER.getByte());
- serverTuple.output.flush();
-
- // Serialize the ExtensionConfig
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] bytes = bos.toByteArray();
- serverTuple.output.writeInt(bytes.length);
- serverTuple.output.write(bytes);
- serverTuple.output.flush();
-
- // Read response
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) { // 0 = success, 1 = error
- throw new TikaException("Failed to save fetcher: " + message);
- }
- LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId,
config.id());
- }
- }
-
- /**
- * Delete a fetcher by its name/ID.
- *
- * @param fetcherId the fetcher name/ID to delete
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., fetcher not
found)
- * @throws InterruptedException if the operation is interrupted
- */
- public void deleteFetcher(String fetcherId) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.DELETE_FETCHER.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- // Read response
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) {
- throw new TikaException("Failed to delete fetcher: " +
message);
- }
- LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId,
fetcherId);
- }
- }
-
- /**
- * List all available fetcher IDs (both static from config and dynamically
added).
- *
- * @return set of fetcher IDs
- * @throws IOException if communication with the server fails
- * @throws InterruptedException if the operation is interrupted
- */
- public Set<String> listFetchers() throws IOException, InterruptedException
{
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.LIST_FETCHERS.getByte());
- serverTuple.output.flush();
-
- // Read response
- int count = serverTuple.input.readInt();
- Set<String> fetcherIds = new HashSet<>(count);
- for (int i = 0; i < count; i++) {
- int len = serverTuple.input.readInt();
- byte[] bytes = new byte[len];
- serverTuple.input.readFully(bytes);
- fetcherIds.add(new String(bytes, StandardCharsets.UTF_8));
- }
- LOG.debug("pipesClientId={}: listed {} fetchers", pipesClientId,
count);
- return fetcherIds;
- }
- }
-
- /**
- * Get the configuration for a specific fetcher.
- *
- * @param fetcherId the fetcher ID
- * @return the fetcher configuration, or null if not found
- * @throws IOException if communication with the server fails
- * @throws InterruptedException if the operation is interrupted
- */
- public ExtensionConfig getFetcherConfig(String fetcherId) throws
IOException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.GET_FETCHER.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = fetcherId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- // Read response
- byte found = serverTuple.input.readByte();
- if (found == 0) {
- return null;
- }
-
- int len = serverTuple.input.readInt();
- byte[] bytes = new byte[len];
- serverTuple.input.readFully(bytes);
-
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- return (ExtensionConfig) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to deserialize ExtensionConfig",
e);
- }
- }
- }
-
- // ========== Emitter Management API ==========
-
- /**
- * Save (create or update) an emitter configuration.
- * The emitter will be available immediately for use in subsequent emit
operations.
- *
- * @param config the emitter configuration containing name, plugin ID, and
parameters
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., invalid
configuration)
- * @throws InterruptedException if the operation is interrupted
- */
- public void saveEmitter(ExtensionConfig config) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.SAVE_EMITTER.getByte());
- serverTuple.output.flush();
-
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] bytes = bos.toByteArray();
- serverTuple.output.writeInt(bytes.length);
- serverTuple.output.write(bytes);
- serverTuple.output.flush();
-
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) {
- throw new TikaException("Failed to save emitter: " + message);
- }
- LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId,
config.id());
- }
- }
-
- /**
- * Delete an emitter by its name/ID.
- *
- * @param emitterId the emitter name/ID to delete
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., emitter not
found)
- * @throws InterruptedException if the operation is interrupted
- */
- public void deleteEmitter(String emitterId) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.DELETE_EMITTER.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) {
- throw new TikaException("Failed to delete emitter: " +
message);
- }
- LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId,
emitterId);
- }
- }
-
- /**
- * List all available emitter IDs (both static from config and dynamically
added).
- *
- * @return set of emitter IDs
- * @throws IOException if communication with the server fails
- * @throws InterruptedException if the operation is interrupted
- */
- public Set<String> listEmitters() throws IOException, InterruptedException
{
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.LIST_EMITTERS.getByte());
- serverTuple.output.flush();
-
- int count = serverTuple.input.readInt();
- Set<String> emitterIds = new HashSet<>(count);
- for (int i = 0; i < count; i++) {
- int len = serverTuple.input.readInt();
- byte[] bytes = new byte[len];
- serverTuple.input.readFully(bytes);
- emitterIds.add(new String(bytes, StandardCharsets.UTF_8));
- }
- LOG.debug("pipesClientId={}: listed {} emitters", pipesClientId,
count);
- return emitterIds;
- }
- }
-
- /**
- * Get the configuration for a specific emitter.
- *
- * @param emitterId the emitter ID
- * @return the emitter configuration, or null if not found
- * @throws IOException if communication with the server fails
- * @throws InterruptedException if the operation is interrupted
- */
- public ExtensionConfig getEmitterConfig(String emitterId) throws
IOException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.GET_EMITTER.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = emitterId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- byte found = serverTuple.input.readByte();
- if (found == 0) {
- return null;
- }
-
- int len = serverTuple.input.readInt();
- byte[] bytes = new byte[len];
- serverTuple.input.readFully(bytes);
-
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- return (ExtensionConfig) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to deserialize ExtensionConfig",
e);
- }
- }
- }
-
- // ========== PipesIterator Management API ==========
-
- /**
- * Save (create or update) a PipesIterator configuration.
- * The iterator will be available immediately for use in subsequent
operations.
- *
- * @param config the iterator configuration containing name, plugin ID,
and parameters
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., invalid
configuration)
- * @throws InterruptedException if the operation is interrupted
- */
- public void savePipesIterator(ExtensionConfig config) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.SAVE_PIPES_ITERATOR.getByte());
- serverTuple.output.flush();
-
- // Serialize the ExtensionConfig
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] bytes = bos.toByteArray();
- serverTuple.output.writeInt(bytes.length);
- serverTuple.output.write(bytes);
- serverTuple.output.flush();
-
- // Read response
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) { // 0 = success, 1 = error
- throw new TikaException("Failed to save pipes iterator: " +
message);
- }
- LOG.debug("pipesClientId={}: saved pipes iterator '{}'",
pipesClientId, config.id());
- }
- }
-
- /**
- * Delete a PipesIterator configuration by its ID.
- *
- * @param iteratorId the iterator ID to delete
- * @throws IOException if communication with the server fails
- * @throws TikaException if the server returns an error (e.g., iterator
not found)
- * @throws InterruptedException if the operation is interrupted
- */
- public void deletePipesIterator(String iteratorId) throws IOException,
TikaException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.DELETE_PIPES_ITERATOR.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- // Read response
- byte status = serverTuple.input.readByte();
- int msgLen = serverTuple.input.readInt();
- byte[] msgBytes = new byte[msgLen];
- serverTuple.input.readFully(msgBytes);
- String message = new String(msgBytes, StandardCharsets.UTF_8);
-
- if (status != 0) {
- throw new TikaException("Failed to delete pipes iterator: " +
message);
- }
- LOG.debug("pipesClientId={}: deleted pipes iterator '{}'",
pipesClientId, iteratorId);
- }
- }
-
- /**
- * Get the configuration for a specific PipesIterator.
- *
- * @param iteratorId the iterator ID
- * @return the iterator configuration, or null if not found
- * @throws IOException if communication with the server fails
- * @throws InterruptedException if the operation is interrupted
- */
- public ExtensionConfig getPipesIteratorConfig(String iteratorId) throws
IOException, InterruptedException {
- maybeInit();
- synchronized (lock) {
- serverTuple.output.write(COMMANDS.GET_PIPES_ITERATOR.getByte());
- serverTuple.output.flush();
-
- byte[] idBytes = iteratorId.getBytes(StandardCharsets.UTF_8);
- serverTuple.output.writeInt(idBytes.length);
- serverTuple.output.write(idBytes);
- serverTuple.output.flush();
-
- // Read response
- byte found = serverTuple.input.readByte();
- if (found == 0) {
- return null;
- }
-
- int len = serverTuple.input.readInt();
- byte[] bytes = new byte[len];
- serverTuple.input.readFully(bytes);
-
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- return (ExtensionConfig) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to deserialize ExtensionConfig",
e);
- }
- }
- }
-
- private final Object[] lock = new Object[0];
-
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 8c04a110c0..66d5d9ae50 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -38,7 +38,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.HexFormat;
import java.util.Locale;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -315,30 +314,6 @@ public class PipesServer implements AutoCloseable {
//swallow
}
System.exit(0);
- } else if (request ==
PipesClient.COMMANDS.SAVE_FETCHER.getByte()) {
- handleSaveFetcher();
- } else if (request ==
PipesClient.COMMANDS.DELETE_FETCHER.getByte()) {
- handleDeleteFetcher();
- } else if (request ==
PipesClient.COMMANDS.LIST_FETCHERS.getByte()) {
- handleListFetchers();
- } else if (request ==
PipesClient.COMMANDS.GET_FETCHER.getByte()) {
- handleGetFetcher();
- } else if (request ==
PipesClient.COMMANDS.SAVE_EMITTER.getByte()) {
- handleSaveEmitter();
- } else if (request ==
PipesClient.COMMANDS.DELETE_EMITTER.getByte()) {
- handleDeleteEmitter();
- } else if (request ==
PipesClient.COMMANDS.LIST_EMITTERS.getByte()) {
- handleListEmitters();
- } else if (request ==
PipesClient.COMMANDS.GET_EMITTER.getByte()) {
- handleGetEmitter();
- } else if (request ==
PipesClient.COMMANDS.SAVE_PIPES_ITERATOR.getByte()) {
- handleSavePipesIterator();
- } else if (request ==
PipesClient.COMMANDS.DELETE_PIPES_ITERATOR.getByte()) {
- handleDeletePipesIterator();
- } else if (request ==
PipesClient.COMMANDS.LIST_PIPES_ITERATORS.getByte()) {
- handleListPipesIterators();
- } else if (request ==
PipesClient.COMMANDS.GET_PIPES_ITERATOR.getByte()) {
- handleGetPipesIterator();
} else {
String msg = String.format(Locale.ROOT,
"pipesClientId=%s: Unexpected byte 0x%02x in
command position. " +
@@ -609,405 +584,4 @@ public class PipesServer implements AutoCloseable {
exit(1);
}
}
-
- // ========== Fetcher Management Handlers ==========
-
- private void handleSaveFetcher() {
- try {
- // Read ExtensionConfig
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
-
- ExtensionConfig config;
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- config = (ExtensionConfig) ois.readObject();
- }
-
- // Save the fetcher
- fetcherManager.saveFetcher(config);
- LOG.debug("pipesClientId={}: saved fetcher '{}'", pipesClientId,
config.id());
-
- // Send success response
- output.writeByte(0); // success
- String msg = "Fetcher saved successfully";
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error saving fetcher", pipesClientId,
e);
- try {
- output.writeByte(1); // error
- String msg = ExceptionUtils.getStackTrace(e);
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioe) {
- LOG.error("pipesClientId={}: failed to send error response",
pipesClientId, ioe);
- exit(1);
- }
- }
- }
-
- private void handleDeleteFetcher() {
- try {
- // Read fetcher ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String fetcherId = new String(bytes, StandardCharsets.UTF_8);
-
- // Delete the fetcher
- fetcherManager.deleteFetcher(fetcherId);
- LOG.debug("pipesClientId={}: deleted fetcher '{}'", pipesClientId,
fetcherId);
-
- // Send success response
- output.writeByte(0); // success
- String msg = "Fetcher deleted successfully";
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error deleting fetcher",
pipesClientId, e);
- try {
- output.writeByte(1); // error
- String msg = ExceptionUtils.getStackTrace(e);
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioe) {
- LOG.error("pipesClientId={}: failed to send error response",
pipesClientId, ioe);
- exit(1);
- }
- }
- }
-
- private void handleListFetchers() {
- try {
- // Get list of fetcher IDs
- Set<String> fetcherIds = fetcherManager.getSupported();
- LOG.debug("pipesClientId={}: listing {} fetchers", pipesClientId,
fetcherIds.size());
-
- // Send response
- output.writeInt(fetcherIds.size());
- for (String id : fetcherIds) {
- byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
- output.writeInt(idBytes.length);
- output.write(idBytes);
- }
- output.flush();
-
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error listing fetchers",
pipesClientId, e);
- exit(1);
- }
- }
-
- private void handleGetFetcher() {
- try {
- // Read fetcher ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String fetcherId = new String(bytes, StandardCharsets.UTF_8);
-
- // Get fetcher config
- ExtensionConfig config = fetcherManager.getConfig(fetcherId);
-
- if (config == null) {
- output.writeByte(0); // not found
- output.flush();
- } else {
- output.writeByte(1); // found
-
- // Serialize config
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] configBytes = bos.toByteArray();
- output.writeInt(configBytes.length);
- output.write(configBytes);
- output.flush();
- }
- LOG.debug("pipesClientId={}: get fetcher '{}' = {}",
pipesClientId, fetcherId, (config != null ? "found" : "not found"));
-
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error getting fetcher",
pipesClientId, e);
- exit(1);
- }
- }
-
- // ========== Emitter Management Handlers ==========
-
- private void handleSaveEmitter() {
- try {
- // Read ExtensionConfig
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
-
- ExtensionConfig config;
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- config = (ExtensionConfig) ois.readObject();
- }
-
- // Save the emitter
- emitterManager.saveEmitter(config);
- LOG.debug("pipesClientId={}: saved emitter '{}'", pipesClientId,
config.id());
-
- // Send success response
- output.writeByte(0); // success
- String msg = "Emitter saved successfully";
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error saving emitter", pipesClientId,
e);
- try {
- output.writeByte(1); // error
- String msg = ExceptionUtils.getStackTrace(e);
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioe) {
- LOG.error("pipesClientId={}: failed to send error response",
pipesClientId, ioe);
- exit(1);
- }
- }
- }
-
- private void handleDeleteEmitter() {
- try {
- // Read emitter ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String emitterId = new String(bytes, StandardCharsets.UTF_8);
-
- // Delete the emitter
- emitterManager.deleteEmitter(emitterId);
- LOG.debug("pipesClientId={}: deleted emitter '{}'", pipesClientId,
emitterId);
-
- // Send success response
- output.writeByte(0); // success
- String msg = "Emitter deleted successfully";
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error deleting emitter",
pipesClientId, e);
- try {
- output.writeByte(1); // error
- String msg = ExceptionUtils.getStackTrace(e);
- byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioe) {
- LOG.error("pipesClientId={}: failed to send error response",
pipesClientId, ioe);
- exit(1);
- }
- }
- }
-
- private void handleListEmitters() {
- try {
- // Get list of emitter IDs
- Set<String> emitterIds = emitterManager.getSupported();
- LOG.debug("pipesClientId={}: listing {} emitters", pipesClientId,
emitterIds.size());
-
- // Send response
- output.writeInt(emitterIds.size());
- for (String id : emitterIds) {
- byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
- output.writeInt(idBytes.length);
- output.write(idBytes);
- }
- output.flush();
-
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error listing emitters",
pipesClientId, e);
- exit(1);
- }
- }
-
- private void handleGetEmitter() {
- try {
- // Read emitter ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String emitterId = new String(bytes, StandardCharsets.UTF_8);
-
- // Get emitter config
- ExtensionConfig config = emitterManager.getConfig(emitterId);
-
- if (config == null) {
- output.writeByte(0); // not found
- output.flush();
- } else {
- output.writeByte(1); // found
-
- // Serialize config
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] configBytes = bos.toByteArray();
- output.writeInt(configBytes.length);
- output.write(configBytes);
- output.flush();
- }
- LOG.debug("pipesClientId={}: get emitter '{}' = {}",
pipesClientId, emitterId, (config != null ? "found" : "not found"));
-
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error getting emitter",
pipesClientId, e);
- exit(1);
- }
- }
-
- // ========== PipesIterator Command Handlers ==========
- // Note: PipesIterators are primarily used on the client side to generate
FetchEmitTuples.
- // Unlike Fetchers and Emitters, they are not component managers in
PipesServer.
- // These handlers provide basic ConfigStore operations for consistency.
-
- private void handleSavePipesIterator() {
- try {
- // Read ExtensionConfig
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
-
- ExtensionConfig config;
- try (ObjectInputStream ois = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))) {
- config = (ExtensionConfig) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to deserialize ExtensionConfig",
e);
- }
-
- // Save to ConfigStore
- configStore.put(PIPES_ITERATOR_PREFIX + config.id(), config);
-
- // Send success response
- output.writeByte(0); // success
- byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- LOG.debug("pipesClientId={}: saved pipes iterator '{}'",
pipesClientId, config.id());
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error saving pipes iterator",
pipesClientId, e);
- try {
- output.writeByte(1); // error
- byte[] msgBytes =
e.getMessage().getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioException) {
- LOG.error("pipesClientId={}: error sending error response",
pipesClientId, ioException);
- }
- }
- }
-
- private void handleDeletePipesIterator() {
- try {
- // Read iterator ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String iteratorId = new String(bytes, StandardCharsets.UTF_8);
-
- // Delete from ConfigStore
- configStore.remove(PIPES_ITERATOR_PREFIX + iteratorId);
-
- // Send success response
- output.writeByte(0); // success
- byte[] msgBytes = "OK".getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
-
- LOG.debug("pipesClientId={}: deleted pipes iterator '{}'",
pipesClientId, iteratorId);
-
- } catch (Exception e) {
- LOG.error("pipesClientId={}: error deleting pipes iterator",
pipesClientId, e);
- try {
- output.writeByte(1); // error
- byte[] msgBytes =
e.getMessage().getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- } catch (IOException ioException) {
- LOG.error("pipesClientId={}: error sending error response",
pipesClientId, ioException);
- }
- }
- }
-
- private void handleListPipesIterators() {
- try {
- // This is a placeholder - list operation not fully implemented
- // Would need to iterate ConfigStore keys with
PIPES_ITERATOR_PREFIX
- output.writeByte(0); // success
- byte[] msgBytes = "[]".getBytes(StandardCharsets.UTF_8);
- output.writeInt(msgBytes.length);
- output.write(msgBytes);
- output.flush();
- LOG.debug("pipesClientId={}: list pipes iterators (placeholder)",
pipesClientId);
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error listing pipes iterators",
pipesClientId, e);
- exit(1);
- }
- }
-
- private void handleGetPipesIterator() {
- try {
- // Read iterator ID
- int len = input.readInt();
- byte[] bytes = new byte[len];
- input.readFully(bytes);
- String iteratorId = new String(bytes, StandardCharsets.UTF_8);
-
- // Get from ConfigStore
- ExtensionConfig config = configStore.get(PIPES_ITERATOR_PREFIX +
iteratorId);
-
- if (config == null) {
- output.writeByte(0); // not found
- output.flush();
- } else {
- output.writeByte(1); // found
-
- // Serialize config
- UnsynchronizedByteArrayOutputStream bos =
UnsynchronizedByteArrayOutputStream.builder().get();
- try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(config);
- }
- byte[] configBytes = bos.toByteArray();
- output.writeInt(configBytes.length);
- output.write(configBytes);
- output.flush();
- }
- LOG.debug("pipesClientId={}: get pipes iterator '{}' = {}",
pipesClientId, iteratorId, (config != null ? "found" : "not found"));
-
- } catch (IOException e) {
- LOG.error("pipesClientId={}: error getting pipes iterator",
pipesClientId, e);
- exit(1);
- }
- }
-
- private static final String PIPES_ITERATOR_PREFIX = "pipesIterator:";
-
}