This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 33094f0c9 TIKA-4572: refactor TikaGrpcServer to use JSON config and 
improve error handling (#2452)
33094f0c9 is described below

commit 33094f0c99d4d7364a63cb59b780664c92cdc326
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Tue Dec 16 10:10:45 2025 -0600

    TIKA-4572: refactor TikaGrpcServer to use JSON config and improve error 
handling (#2452)
    
    * TIKA-4572: refactor TikaGrpcServer to use JSON config and improve error 
handling
    
    * Fix TikaGrpcServerTest to use dynamic basePath for CI compatibility
    
    ---------
    
    Co-authored-by: Nicholas DiPiazza <[email protected]>
---
 .run/TikaGrpcServer.run.xml                        |  17 ++
 .../tika/pipes/grpc/ExpiringFetcherStore.java      | 103 --------
 .../org/apache/tika/pipes/grpc/TikaGrpcServer.java |  19 +-
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 293 ++++++++-------------
 .../tika/pipes/grpc/ExpiringFetcherStoreTest.java  |  72 -----
 ...PipesBiDirectionalStreamingIntegrationTest.java |  99 ++++---
 .../apache/tika/pipes/grpc/TikaGrpcServerTest.java | 101 +++++--
 .../src/test/resources/tika-pipes-test-config.json |  35 +++
 .../src/test/resources/tika-pipes-test-config.xml  |  35 ---
 tika-pipes/tika-pipes-plugins/pom.xml              |   4 +-
 10 files changed, 300 insertions(+), 478 deletions(-)

diff --git a/.run/TikaGrpcServer.run.xml b/.run/TikaGrpcServer.run.xml
new file mode 100644
index 000000000..c189e21be
--- /dev/null
+++ b/.run/TikaGrpcServer.run.xml
@@ -0,0 +1,17 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="TikaGrpcServer" type="Application" 
factoryName="Application" nameIsGenerated="true">
+    <option name="MAIN_CLASS_NAME" 
value="org.apache.tika.pipes.grpc.TikaGrpcServer" />
+    <module name="tika-grpc" />
+    <option name="PROGRAM_PARAMETERS" value="--config 
src/test/resources/tika-pipes-test-config.json" />
+    <option name="WORKING_DIRECTORY" value="$MODULE_WORKING_DIR$" />
+    <extension name="coverage">
+      <pattern>
+        <option name="PATTERN" value="org.apache.tika.pipes.grpc.*" />
+        <option name="ENABLED" value="true" />
+      </pattern>
+    </extension>
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>
\ No newline at end of file
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
deleted file mode 100644
index 70553d771..000000000
--- 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.grpc;
-
-import java.time.Instant;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.plugins.ExtensionConfig;
-
-public class ExpiringFetcherStore implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ExpiringFetcherStore.class);
-    public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
-    private final Map<String, Fetcher> fetchers = 
Collections.synchronizedMap(new HashMap<>());
-    private final Map<String, ExtensionConfig> fetcherConfigs = 
Collections.synchronizedMap(new HashMap<>());
-    private final Map<String, Instant> fetcherLastAccessed = 
Collections.synchronizedMap(new HashMap<>());
-
-    private final ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
-
-    public ExpiringFetcherStore(int expireAfterSeconds, int 
checkForExpiredFetchersDelaySeconds) {
-        executorService.scheduleAtFixedRate(() -> {
-            Set<String> expired = new HashSet<>();
-            for (String fetcherPluginId : fetchers.keySet()) {
-                Instant lastAccessed = 
fetcherLastAccessed.get(fetcherPluginId);
-                if (lastAccessed == null) {
-                    LOG.error("Detected a fetcher with no last access time. 
FetcherName={}", fetcherPluginId);
-                    expired.add(fetcherPluginId);
-                } else if (Instant
-                        .now()
-                        
.isAfter(lastAccessed.plusSeconds(expireAfterSeconds))) {
-                    LOG.info("Detected stale fetcher {} hasn't been accessed 
in {} seconds. " + "Deleting.", fetcherPluginId, Instant
-                            .now()
-                            .getEpochSecond() - lastAccessed.getEpochSecond());
-                    expired.add(fetcherPluginId);
-                }
-            }
-            for (String expiredFetcherId : expired) {
-                deleteFetcher(expiredFetcherId);
-            }
-        }, EXPIRE_JOB_INITIAL_DELAY, checkForExpiredFetchersDelaySeconds, 
TimeUnit.SECONDS);
-    }
-
-    public boolean deleteFetcher(String fetcherPluginId) {
-        boolean success = fetchers.remove(fetcherPluginId) != null;
-        fetcherConfigs.remove(fetcherPluginId);
-        fetcherLastAccessed.remove(fetcherPluginId);
-        return success;
-    }
-
-    public Map<String, Fetcher> getFetchers() {
-        return fetchers;
-    }
-
-    public Map<String, ExtensionConfig> getFetcherConfigs() {
-        return fetcherConfigs;
-    }
-
-    /**
-     * This method will get the fetcher, but will also log the access the 
fetcher as having
-     * been accessed. This prevents the scheduled job from removing the stale 
fetcher.
-     */
-    public <T extends Fetcher> T getFetcherAndLogAccess(String 
fetcherPluginId) {
-        fetcherLastAccessed.put(fetcherPluginId, Instant.now());
-        return (T) fetchers.get(fetcherPluginId);
-    }
-
-    public <T extends Fetcher> void createFetcher(T fetcher, ExtensionConfig 
config) {
-        String id = fetcher.getExtensionConfig().id();
-
-        fetchers.put(id, fetcher);
-        fetcherConfigs.put(id, config);
-        getFetcherAndLogAccess(id);
-    }
-
-    @Override
-    public void close() {
-        executorService.shutdownNow();
-    }
-}
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java
index 74e005303..2b350e382 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java
@@ -43,8 +43,8 @@ public class TikaGrpcServer {
     @Parameter(names = {"-p", "--port"}, description = "The grpc server port", 
help = true)
     private Integer port = TIKA_SERVER_GRPC_DEFAULT_PORT;
 
-    @Parameter(names = {"-c", "--config"}, description = "The grpc server 
port", help = true)
-    private File tikaConfigXml;
+    @Parameter(names = {"-c", "--config"}, description = "The tika config 
file", help = true)
+    private File tikaConfig;
 
     @Parameter(names = {"-l", "--plugins"}, description = "The tika pipes 
plugins config file", help = true)
     private File tikaPlugins;
@@ -86,15 +86,10 @@ public class TikaGrpcServer {
         } else {
             creds = InsecureServerCredentials.create();
         }
-        //TODO -- this has to be converted to json
-        if (tikaConfigXml == null) {
-            // Create a default tika config
-            /*tikaConfigXml = Files.createTempFile("tika-config", 
".xml").toFile();
-            try (FileWriter fw = new FileWriter(tikaConfigXml, 
StandardCharsets.UTF_8)) {
-                TikaConfigSerializer.serialize(new TikaConfig(), 
TikaConfigSerializer.Mode.STATIC_FULL, fw, StandardCharsets.UTF_8);
-            }*/
+        if (tikaConfig == null) {
+            throw new IllegalArgumentException("Tika config file is required");
         }
-        File tikaConfigFile = new File(tikaConfigXml.getAbsolutePath());
+        File tikaConfigFile = new File(tikaConfig.getAbsolutePath());
         healthStatusManager.setStatus(TikaGrpcServer.class.getSimpleName(), 
ServingStatus.SERVING);
         server = Grpc
                 .newServerBuilderForPort(port, creds)
@@ -157,8 +152,8 @@ public class TikaGrpcServer {
         server.blockUntilShutdown();
     }
 
-    public TikaGrpcServer setTikaConfigXml(File tikaConfigXml) {
-        this.tikaConfigXml = tikaConfigXml;
+    public TikaGrpcServer setTikaConfig(File tikaConfig) {
+        this.tikaConfig = tikaConfig;
         return this;
     }
 
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 492f096d5..4e60155d5 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -18,12 +18,9 @@ package org.apache.tika.pipes.grpc;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Map;
 import java.util.Objects;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.TransformerException;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -34,11 +31,10 @@ import 
com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import com.google.rpc.Status;
 import io.grpc.protobuf.StatusProto;
 import io.grpc.stub.StreamObserver;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.pf4j.PluginManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
 
 import org.apache.tika.DeleteFetcherReply;
 import org.apache.tika.DeleteFetcherRequest;
@@ -54,7 +50,7 @@ import org.apache.tika.SaveFetcherReply;
 import org.apache.tika.SaveFetcherRequest;
 import org.apache.tika.TikaGrpc;
 import org.apache.tika.config.ConfigContainer;
-import org.apache.tika.config.loader.TikaLoader;
+import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
@@ -66,7 +62,9 @@ import org.apache.tika.pipes.api.fetcher.FetchKey;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
 import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.fetcher.FetcherManager;
 import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
 
 class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(TikaGrpcServerImpl.class);
@@ -77,99 +75,41 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     }
     public static final JsonSchemaGenerator JSON_SCHEMA_GENERATOR = new 
JsonSchemaGenerator(OBJECT_MAPPER);
 
-    /**
-     * FetcherID is key, The pair is the Fetcher object and the Metadata
-     */
     PipesConfig pipesConfig;
     PipesClient pipesClient;
-    ExpiringFetcherStore expiringFetcherStore;
+    FetcherManager fetcherManager;
+    Path tikaConfigPath;
+    PluginManager pluginManager;
 
-    String tikaConfigPath;
-
-    TikaGrpcServerImpl(String tikaConfigPath)
-            throws TikaConfigException, IOException, 
ParserConfigurationException,
-            TransformerException, SAXException {
+    TikaGrpcServerImpl(String tikaConfigPath) throws TikaConfigException, 
IOException {
         File tikaConfigFile = new File(tikaConfigPath);
-        if (!tikaConfigFile.canWrite()) {
-            File tmpTikaConfigFile = Files.createTempFile("configCopy", 
tikaConfigFile.getName()).toFile();
-            tmpTikaConfigFile.deleteOnExit();
-            LOG.info("Tika config file {} is read-only. Making a temporary 
copy to {}", tikaConfigFile, tmpTikaConfigFile);
-            String tikaConfigFileContents = 
FileUtils.readFileToString(tikaConfigFile, StandardCharsets.UTF_8);
-            FileUtils.writeStringToFile(tmpTikaConfigFile, 
tikaConfigFileContents, StandardCharsets.UTF_8);
-            tikaConfigFile = tmpTikaConfigFile;
-            tikaConfigPath = tikaConfigFile.getAbsolutePath();
+        if (!tikaConfigFile.exists()) {
+            throw new TikaConfigException("Tika config file does not exist: " 
+ tikaConfigPath);
         }
-        pipesConfig = 
TikaLoader.load(tikaConfigFile.toPath()).configs().load("pipes", 
PipesConfig.class);
-        pipesClient = new PipesClient(pipesConfig, tikaConfigFile.toPath());
 
-        expiringFetcherStore = new 
ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
-                pipesConfig.getStaleFetcherDelaySeconds());
-        this.tikaConfigPath = tikaConfigPath;
-        try {
-            updateTikaConfig();
-        } catch (TikaException e) {
-            throw new TikaConfigException("Problem updating tikaConfig", e);
-        }
-    }
+        Path configPath = tikaConfigFile.toPath();
+        this.tikaConfigPath = configPath;
 
-    private void updateTikaConfig() throws ParserConfigurationException, 
IOException, SAXException, TransformerException, TikaException {
-        /* TODO -- update this with json stuff if necessary any more at all?
-        Document tikaConfigDoc =
-                
DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(configPath);
 
-        Element fetchersElement = (Element) 
tikaConfigDoc.getElementsByTagName("fetchers").item(0);
-        if (fetchersElement == null) {
-            fetchersElement = tikaConfigDoc.createElement("fetchers");
-            tikaConfigDoc.getDocumentElement().appendChild(fetchersElement);
+        // Load PipesConfig directly from root level (not from "other-configs")
+        pipesConfig = tikaJsonConfig.deserialize("pipes", PipesConfig.class);
+        if (pipesConfig == null) {
+            pipesConfig = new PipesConfig();
         }
-        for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
-            
fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
-        }
-        for (var fetcherEntry : expiringFetcherStore.getFetchers().entrySet()) 
{
-            Fetcher fetcherObject = fetcherEntry.getValue();
-            Map<String, Object> fetcherConfigParams = 
OBJECT_MAPPER.convertValue(
-                    
expiringFetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
-                    new TypeReference<>() {
-                    });
-            Element fetcher = tikaConfigDoc.createElement("fetcher");
-            fetcher.setAttribute("class", 
fetcherEntry.getValue().getClass().getName());
-            Element fetcherPluginId = tikaConfigDoc.createElement("name");
-            
fetcherPluginId.setTextContent(fetcherObject.getExtensionConfig().id());
-            fetcher.appendChild(fetcherPluginId);
-            populateFetcherConfigs(fetcherConfigParams, tikaConfigDoc, 
fetcher);
-            fetchersElement.appendChild(fetcher);
-        }
-        DOMSource source = new DOMSource(tikaConfigDoc);
-        FileWriter writer = new FileWriter(tikaConfigPath, 
StandardCharsets.UTF_8);
-        StreamResult result = new StreamResult(writer);
 
-        TransformerFactory transformerFactory = 
XMLReaderUtils.getTransformerFactory();
-        Transformer transformer = transformerFactory.newTransformer();
-        transformer.transform(source, result);
+        pipesClient = new PipesClient(pipesConfig, configPath);
+        
+        try {
+            pluginManager = TikaPluginManager.load(tikaJsonConfig);
+        } catch (TikaConfigException e) {
+            LOG.warn("Could not load plugin manager, using default: {}", 
e.getMessage());
+            pluginManager = new org.pf4j.DefaultPluginManager();
+        }
 
-         */
+        fetcherManager = FetcherManager.load(pluginManager, tikaJsonConfig, 
true);
     }
 
-    /*
-    private void populateFetcherConfigs(Map<String, Object> 
fetcherConfigParams,
-                                        Document tikaConfigDoc, Element 
fetcher) {
-        for (var configParam : fetcherConfigParams.entrySet()) {
-            Element configElm = 
tikaConfigDoc.createElement(configParam.getKey());
-            fetcher.appendChild(configElm);
-            if (configParam.getValue() instanceof List) {
-                List configParamVal = (List) configParam.getValue();
-                String singularName = configParam.getKey().substring(0, 
configParam.getKey().length() - 1);
-                for (Object configParamObj : configParamVal) {
-                    Element childElement = 
tikaConfigDoc.createElement(singularName);
-                    
childElement.setTextContent(Objects.toString(configParamObj));
-                    configElm.appendChild(childElement);
-                }
-            } else {
-                
configElm.setTextContent(Objects.toString(configParam.getValue()));
-            }
-        }
-    }*/
-
     @Override
     public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
                                                  
StreamObserver<FetchAndParseReply> responseObserver) {
@@ -204,31 +144,26 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         responseObserver.onCompleted();
     }
 
-
     private void fetchAndParseImpl(FetchAndParseRequest request,
                                    StreamObserver<FetchAndParseReply> 
responseObserver) {
-        Fetcher fetcher =
-                
expiringFetcherStore.getFetcherAndLogAccess(request.getFetcherId());
-        if (fetcher == null) {
-            throw new RuntimeException(
-                    "Could not find fetcher with name " + 
request.getFetcherId());
+        Fetcher fetcher;
+        try {
+            fetcher = fetcherManager.getFetcher(request.getFetcherId());
+        } catch (TikaException | IOException e) {
+            throw new RuntimeException("Could not find fetcher with name " + 
request.getFetcherId(), e);
         }
+
         Metadata tikaMetadata = new Metadata();
         try {
             ParseContext parseContext = new ParseContext();
             String additionalFetchConfigJson = 
request.getAdditionalFetchConfigJson();
             if (StringUtils.isNotBlank(additionalFetchConfigJson)) {
-                // The fetch and parse has the option to specify additional 
configuration
-                ExtensionConfig abstractConfig = expiringFetcherStore
-                        .getFetcherConfigs()
-                        .get(fetcher.getExtensionConfig().id());
                 ConfigContainer configContainer = new ConfigContainer();
                 configContainer.set(request.getFetcherId(), 
request.getAdditionalFetchConfigJson());
                 parseContext.set(ConfigContainer.class, configContainer);
             }
-            PipesResult pipesResult = pipesClient.process(new 
FetchEmitTuple(request.getFetchKey(),
-                    new FetchKey(fetcher.getExtensionConfig()
-                                        .id(), request.getFetchKey()), new 
EmitKey(), tikaMetadata, parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+            PipesResult pipesResult = pipesClient.process(new 
FetchEmitTuple(request.getFetchKey(), new 
FetchKey(fetcher.getExtensionConfig().id(), request.getFetchKey()),
+                    new EmitKey(), tikaMetadata, parseContext, 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
             FetchAndParseReply.Builder fetchReplyBuilder =
                     FetchAndParseReply.newBuilder()
                                       .setFetchKey(request.getFetchKey())
@@ -261,56 +196,54 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         SaveFetcherReply reply =
                 
SaveFetcherReply.newBuilder().setFetcherId(request.getFetcherId()).build();
         try {
-            Map<String, Object> fetcherConfigMap = 
OBJECT_MAPPER.readValue(request.getFetcherConfigJson(), new TypeReference<>() 
{});
-            //TODO Sorry need to update this
-//            Map<String, Param> tikaParamsMap = 
createTikaParamMap(fetcherConfigMap);
-  //          saveFetcher(request.getFetcherId(), request.getFetcherClass(), 
fetcherConfigMap, tikaParamsMap);
-            updateTikaConfig();
+            String factoryName = 
findFactoryNameForClass(request.getFetcherClass());
+            ExtensionConfig config = new 
ExtensionConfig(request.getFetcherId(), factoryName, 
request.getFetcherConfigJson());
+
+            // Check if fetcher already exists, if so, we need to update it
+            if 
(fetcherManager.getSupported().contains(request.getFetcherId())) {
+                LOG.info("Updating existing fetcher: {}", 
request.getFetcherId());
+                // We can't update directly, so we need to work around this by 
using reflection
+                // or just accept that updates aren't supported in the new 
system
+                // For now, let's just replace it in the internal map using 
reflection
+                try {
+                    java.lang.reflect.Field configsField = 
fetcherManager.getClass().getSuperclass().getDeclaredField("componentConfigs");
+                    configsField.setAccessible(true);
+                    @SuppressWarnings("unchecked") java.util.Map<String, 
ExtensionConfig> configs = (java.util.Map<String, ExtensionConfig>) 
configsField.get(fetcherManager);
+                    configs.put(config.id(), config);
+
+                    // Also clear the cache so it gets re-instantiated
+                    java.lang.reflect.Field cacheField = 
fetcherManager.getClass().getSuperclass().getDeclaredField("componentCache");
+                    cacheField.setAccessible(true);
+                    @SuppressWarnings("unchecked") java.util.Map<String, 
Fetcher> cache = (java.util.Map<String, Fetcher>) 
cacheField.get(fetcherManager);
+                    cache.remove(config.id());
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to update fetcher", e);
+                }
+            } else {
+                fetcherManager.saveFetcher(config);
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
     }
-/*
-    private void saveFetcher(String name, String fetcherClassName, Map<String, 
Object> paramsMap, Map<String, Param> tikaParamsMap) {
-        try {
-            if (paramsMap == null) {
-                paramsMap = new LinkedHashMap<>();
-            }
-            Class<? extends Fetcher> fetcherClass =
-                    (Class<? extends Fetcher>) Class.forName(fetcherClassName);
-            //TODO -- fix this!
-            String configClassName =
-                    fetcherClass.getPackageName() + ".config." + 
fetcherClass.getSimpleName() +
-                            "Config";
-            ExtensionConfig configObject = 
OBJECT_MAPPER.convertValue(paramsMap, ExtensionConfig.class);
-            Fetcher abstractFetcher =
-                    
fetcherClass.getDeclaredConstructor(configObject.getClass()).newInstance(configObject);
-
-            if (expiringFetcherStore.deleteFetcher(name)) {
-                LOG.info("Updating fetcher {}", name);
-            } else {
-                LOG.info("Creating new fetcher {}", name);
-            }
-            expiringFetcherStore.createFetcher(abstractFetcher, configObject);
-        } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException |
-                 InvocationTargetException | NoSuchMethodException | 
TikaConfigException e) {
-            throw new RuntimeException(e);
-        }
-    }
-*/
-    /*
-    private static Map<String, Param> createTikaParamMap(Map<String, Object> 
fetcherConfigMap) {
-        Map<String, Param> tikaParamsMap = new HashMap<>();
-        for (Map.Entry<String, Object> entry : fetcherConfigMap.entrySet()) {
-            if (entry.getValue() != null) {
-                tikaParamsMap.put(entry.getKey(), new Param<>(entry.getKey(), 
entry.getValue()));
+
+    private String findFactoryNameForClass(String className) throws 
TikaConfigException {
+        var factories = 
pluginManager.getExtensions(org.apache.tika.pipes.api.fetcher.FetcherFactory.class);
+        for (var factory : factories) {
+            try {
+                ExtensionConfig tempConfig = new ExtensionConfig("temp", 
factory.getName(), "{}");
+                Fetcher fetcher = factory.buildExtension(tempConfig);
+                if (fetcher.getClass().getName().equals(className)) {
+                    return factory.getName();
+                }
+            } catch (Exception e) {
+                LOG.debug("Could not build fetcher for factory: {}", 
factory.getName(), e);
             }
         }
-        return tikaParamsMap;
+        throw new TikaConfigException("Could not find factory for class: " + 
className);
     }
-*/
     static Status notFoundStatus(String fetcherId) {
         return Status.newBuilder()
                 .setCode(io.grpc.Status.Code.NOT_FOUND.value())
@@ -322,70 +255,53 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     public void getFetcher(GetFetcherRequest request,
                            StreamObserver<GetFetcherReply> responseObserver) {
         GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
-        ExtensionConfig abstractConfig =
-                
expiringFetcherStore.getFetcherConfigs().get(request.getFetcherId());
-        Fetcher abstractFetcher = 
expiringFetcherStore.getFetchers().get(request.getFetcherId());
-        if (abstractFetcher == null || abstractConfig == null) {
+        try {
+            Fetcher fetcher = 
fetcherManager.getFetcher(request.getFetcherId());
+            ExtensionConfig config = fetcher.getExtensionConfig();
+
+            getFetcherReply.setFetcherId(config.id());
+            // Return the class name instead of the factory name for backward 
compatibility
+            getFetcherReply.setFetcherClass(fetcher.getClass().getName());
+
+            Map<String, Object> paramMap = 
OBJECT_MAPPER.readValue(config.json(), new TypeReference<>() {
+            });
+            paramMap.forEach((k, v) -> 
getFetcherReply.putParams(Objects.toString(k), Objects.toString(v)));
+
+            responseObserver.onNext(getFetcherReply.build());
+            responseObserver.onCompleted();
+        } catch (Exception e) {
             
responseObserver.onError(StatusProto.toStatusException(notFoundStatus(request.getFetcherId())));
-            return;
         }
-        getFetcherReply.setFetcherId(request.getFetcherId());
-        getFetcherReply.setFetcherClass(abstractFetcher.getClass().getName());
-        Map<String, Object> paramMap = 
OBJECT_MAPPER.convertValue(abstractConfig, new TypeReference<>() {});
-        paramMap.forEach(
-                (k, v) -> getFetcherReply.putParams(Objects.toString(k), 
Objects.toString(v)));
-        responseObserver.onNext(getFetcherReply.build());
-        responseObserver.onCompleted();
     }
 
     @Override
     public void listFetchers(ListFetchersRequest request,
                              StreamObserver<ListFetchersReply> 
responseObserver) {
         ListFetchersReply.Builder listFetchersReplyBuilder = 
ListFetchersReply.newBuilder();
-        for (Map.Entry<String, ExtensionConfig> fetcherConfig : 
expiringFetcherStore.getFetcherConfigs()
-                                                                               
     .entrySet()) {
-            GetFetcherReply.Builder replyBuilder = 
saveFetcherReply(fetcherConfig);
-            
listFetchersReplyBuilder.addGetFetcherReplies(replyBuilder.build());
-        }
-        responseObserver.onNext(listFetchersReplyBuilder.build());
-        responseObserver.onCompleted();
-    }
+        for (String fetcherId : fetcherManager.getSupported()) {
+            try {
+                Fetcher fetcher = fetcherManager.getFetcher(fetcherId);
+                ExtensionConfig config = fetcher.getExtensionConfig();
 
-    private GetFetcherReply.Builder saveFetcherReply(
-            Map.Entry<String, ExtensionConfig> fetcherConfig) {
-        Fetcher abstractFetcher =
-                expiringFetcherStore.getFetchers().get(fetcherConfig.getKey());
-        ExtensionConfig abstractConfig =
-                
expiringFetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
-        GetFetcherReply.Builder replyBuilder =
-                
GetFetcherReply.newBuilder().setFetcherClass(abstractFetcher.getClass().getName())
-                        
.setFetcherId(abstractFetcher.getExtensionConfig().id());
-        loadParamsIntoReply(abstractConfig, replyBuilder);
-        return replyBuilder;
-    }
+                GetFetcherReply.Builder replyBuilder = 
GetFetcherReply.newBuilder().setFetcherId(config.id()).setFetcherClass(fetcher.getClass().getName());
 
-    private static void loadParamsIntoReply(ExtensionConfig abstractConfig,
-                                            GetFetcherReply.Builder 
replyBuilder) {
-        Map<String, Object> paramMap =
-                OBJECT_MAPPER.convertValue(abstractConfig, new 
TypeReference<>() {
+                Map<String, Object> paramMap = 
OBJECT_MAPPER.readValue(config.json(), new TypeReference<>() {
                 });
-        if (paramMap != null) {
-            paramMap.forEach(
-                    (k, v) -> replyBuilder.putParams(Objects.toString(k), 
Objects.toString(v)));
+                paramMap.forEach((k, v) -> 
replyBuilder.putParams(Objects.toString(k), Objects.toString(v)));
+
+                
listFetchersReplyBuilder.addGetFetcherReplies(replyBuilder.build());
+            } catch (Exception e) {
+                LOG.error("Error listing fetcher: {}", fetcherId, e);
+            }
         }
+        responseObserver.onNext(listFetchersReplyBuilder.build());
+        responseObserver.onCompleted();
     }
 
     @Override
     public void deleteFetcher(DeleteFetcherRequest request,
                               StreamObserver<DeleteFetcherReply> 
responseObserver) {
         boolean successfulDelete = deleteFetcher(request.getFetcherId());
-        if (successfulDelete) {
-            try {
-                updateTikaConfig();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
         
responseObserver.onNext(DeleteFetcherReply.newBuilder().setSuccess(successfulDelete).build());
         responseObserver.onCompleted();
     }
@@ -404,6 +320,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     }
 
     private boolean deleteFetcher(String id) {
-        return expiringFetcherStore.deleteFetcher(id);
+        LOG.warn("Deleting fetchers is not supported in the current 
implementation");
+        return false;
     }
 }
diff --git 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
 
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
deleted file mode 100644
index d5a25844c..000000000
--- 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.grpc;
-
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-import java.io.IOException;
-import java.time.Duration;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TikaInputStream;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.plugins.ExtensionConfig;
-
-class ExpiringFetcherStoreTest {
-
-    private static final ObjectMapper MAPPER = new ObjectMapper();
-
-    @Test
-    void createFetcher() throws Exception {
-        try (ExpiringFetcherStore expiringFetcherStore = new 
ExpiringFetcherStore(1, 5)) {
-            Fetcher fetcher = new Fetcher() {
-                @Override
-                public TikaInputStream fetch(String fetchKey, Metadata 
metadata, ParseContext parseContext) throws TikaException, IOException {
-                    return null;
-                }
-
-                @Override
-                public ExtensionConfig getExtensionConfig() {
-                    return new ExtensionConfig("nick", "factory-plugin-id", 
"{}");
-                }
-            };
-            expiringFetcherStore.createFetcher(fetcher, 
fetcher.getExtensionConfig());
-
-            Assertions.assertNotNull(expiringFetcherStore
-                    .getFetchers()
-                    .get(fetcher.getExtensionConfig().id()));
-
-            Awaitility
-                    .await()
-                    .atMost(Duration.ofSeconds(60))
-                    .until(() -> expiringFetcherStore
-                            .getFetchers()
-                            .get(fetcher.getExtensionConfig().id()) == null);
-
-            assertNull(expiringFetcherStore
-                    .getFetcherConfigs()
-                    .get(fetcher.getExtensionConfig().id()));
-        }
-    }
-}
diff --git 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/PipesBiDirectionalStreamingIntegrationTest.java
 
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/PipesBiDirectionalStreamingIntegrationTest.java
index ef4ff7cee..9a02b603f 100644
--- 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/PipesBiDirectionalStreamingIntegrationTest.java
+++ 
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/PipesBiDirectionalStreamingIntegrationTest.java
@@ -20,18 +20,20 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
 import io.grpc.Grpc;
 import io.grpc.ManagedChannel;
 import io.grpc.TlsChannelCredentials;
@@ -45,18 +47,13 @@ import org.eclipse.jetty.util.resource.PathResource;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.FetchAndParseReply;
 import org.apache.tika.FetchAndParseRequest;
-import org.apache.tika.SaveFetcherReply;
-import org.apache.tika.SaveFetcherRequest;
 import org.apache.tika.TikaGrpc;
-import org.apache.tika.pipes.fetcher.http.HttpFetcher;
 
 /**
  * This test will start an HTTP server using jetty.
@@ -64,14 +61,13 @@ import org.apache.tika.pipes.fetcher.http.HttpFetcher;
  * Then it will, using a bidirectional stream of data, send urls to the
  * HTTP fetcher whilst simultaneously receiving parsed output as they parse.
  */
-@Disabled("until we can get the plugins config working")
 class PipesBiDirectionalStreamingIntegrationTest {
     static final Logger LOGGER = 
LoggerFactory.getLogger(PipesBiDirectionalStreamingIntegrationTest.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    static File tikaConfigXmlTemplate = Paths
-            .get("src", "test", "resources", "tika-pipes-test-config.xml")
+    static File tikaConfigTemplate = Paths
+            .get("src", "test", "resources", "tika-pipes-test-config.json")
             .toFile();
-    static File tikaConfigXml = new File("target", "tika-config-" + 
UUID.randomUUID() + ".xml");
+    static File tikaConfig = new File("target", "tika-config-" + 
UUID.randomUUID() + ".json");
     static TikaGrpcServer grpcServer;
     static int grpcPort;
     static String httpServerUrl;
@@ -110,10 +106,33 @@ class PipesBiDirectionalStreamingIntegrationTest {
     @BeforeAll
     static void setUpGrpcServer() throws Exception {
         grpcPort = findAvailablePort();
-        FileUtils.copyFile(tikaConfigXmlTemplate, tikaConfigXml);
+
+        // Read the template config
+        String configContent = FileUtils.readFileToString(tikaConfigTemplate, 
StandardCharsets.UTF_8);
+
+        // Parse it as JSON to inject the correct javaPath
+        @SuppressWarnings("unchecked")
+        Map<String, Object> configMap = OBJECT_MAPPER.readValue(configContent, 
Map.class);
+
+        // Get or create the pipes section
+        @SuppressWarnings("unchecked")
+        Map<String, Object> pipesSection = (Map<String, Object>) 
configMap.get("pipes");
+        if (pipesSection == null) {
+            pipesSection = new HashMap<>();
+            configMap.put("pipes", pipesSection);
+        }
+
+        // Set javaPath to the same Java running the test
+        String javaHome = System.getProperty("java.home");
+        String javaPath = javaHome + File.separator + "bin" + File.separator + 
"java";
+        pipesSection.put("javaPath", javaPath);
+
+        // Write the modified config
+        String modifiedConfig = 
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(configMap);
+        FileUtils.writeStringToFile(tikaConfig, modifiedConfig, 
StandardCharsets.UTF_8);
 
         grpcServer = new TikaGrpcServer();
-        grpcServer.setTikaConfigXml(tikaConfigXml);
+        grpcServer.setTikaConfig(tikaConfig);
         grpcServer.setPort(grpcPort);
         grpcServer.setSecure(true);
         grpcServer.setCertChain(Paths.get("src", "test", "resources", "certs", 
"server1.pem").toFile());
@@ -156,45 +175,41 @@ class PipesBiDirectionalStreamingIntegrationTest {
 
     @AfterAll
     static void cleanConfig() throws Exception {
-        FileUtils.deleteQuietly(tikaConfigXml);
-    }
-
-    @BeforeEach
-    void createHttpFetcher() throws Exception {
-        SaveFetcherRequest saveFetcherRequest = SaveFetcherRequest
-                .newBuilder()
-                .setFetcherId(httpFetcherId)
-                .setFetcherClass(HttpFetcher.class.getName())
-                
.setFetcherConfigJson(OBJECT_MAPPER.writeValueAsString(ImmutableMap
-                        .builder()
-                        .put("requestTimeout", 30_000)
-                        .put("socketTimeout", 30_000)
-                        .put("connectTimeout", 20_000)
-                        .put("maxConnectionsPerRoute", 200)
-                        .put("maxRedirects", 0)
-                        .put("maxSpoolSize", -1)
-                        .put("overallTimeout", 50_000)
-                        .build()))
-                .build();
-        SaveFetcherReply saveFetcherReply = 
tikaBlockingStub.saveFetcher(saveFetcherRequest);
-        Assertions.assertEquals(saveFetcherReply.getFetcherId(), 
httpFetcherId);
+        FileUtils.deleteQuietly(tikaConfig);
     }
 
     @Test
     void testHttpFetchScenario() throws Exception {
         AtomicInteger numParsed = new AtomicInteger();
+        AtomicInteger numErrors = new AtomicInteger();
         Map<String, Map<String, String>> result = 
Collections.synchronizedMap(new HashMap<>());
+        List<String> errorMessages = Collections.synchronizedList(new 
ArrayList<>());
         StreamObserver<FetchAndParseReply> responseObserver = new 
StreamObserver<>() {
             @Override
             public void onNext(FetchAndParseReply fetchAndParseReply) {
-                LOGGER.info("Parsed: {}", fetchAndParseReply.getFetchKey());
-                numParsed.incrementAndGet();
-                result.put(fetchAndParseReply.getFetchKey(), 
fetchAndParseReply.getFieldsMap());
+                String status = fetchAndParseReply.getStatus();
+                LOGGER.info("Parsed: {} with status: {}", 
fetchAndParseReply.getFetchKey(), status);
+                
+                // Check if this is an error status
+                if (status.contains("EXCEPTION") || status.contains("ERROR")) {
+                    numErrors.incrementAndGet();
+                    String errorMsg = String.format(Locale.ROOT, "Failed to 
parse %s - status: %s, message: %s",
+                        fetchAndParseReply.getFetchKey(), 
+                        status,
+                        fetchAndParseReply.getErrorMessage());
+                    errorMessages.add(errorMsg);
+                    LOGGER.error(errorMsg);
+                } else {
+                    numParsed.incrementAndGet();
+                    result.put(fetchAndParseReply.getFetchKey(), 
fetchAndParseReply.getFieldsMap());
+                }
             }
 
             @Override
             public void onError(Throwable throwable) {
                 LOGGER.error("Error occurred", throwable);
+                numErrors.incrementAndGet();
+                errorMessages.add("Stream error: " + throwable.getMessage());
             }
 
             @Override
@@ -212,8 +227,16 @@ class PipesBiDirectionalStreamingIntegrationTest {
         }
         request.onCompleted();
 
-        Awaitility.await().atMost(Duration.ofSeconds(600)).until(() -> 
result.size() == files.size());
+        Awaitility.await().atMost(Duration.ofSeconds(600)).until(() -> 
+            (result.size() + numErrors.get()) >= files.size());
 
+        // Fail the test if there were any errors
+        if (numErrors.get() > 0) {
+            Assertions.fail("Test failed with " + numErrors.get() + " 
errors:\n" + 
+                String.join("\n", errorMessages));
+        }
+        
         Assertions.assertEquals(files.size(), numParsed.get());
+        Assertions.assertEquals(files.size(), result.size());
     }
 }
diff --git 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java 
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
index d09864ca6..02c536d62 100644
--- a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
+++ b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -41,8 +42,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import io.grpc.ManagedChannel;
 import io.grpc.Server;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.StreamObserver;
@@ -51,7 +50,6 @@ import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
@@ -70,42 +68,85 @@ import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
 
 @ExtendWith(GrpcCleanupExtension.class)
-@Disabled("until we can correctly configure the tika plugins.json file")
 public class TikaGrpcServerTest {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOG = 
LoggerFactory.getLogger(TikaGrpcServerTest.class);
     public static final int NUM_TEST_DOCS = 2;
-    static File tikaConfigXmlTemplate = Paths
-            .get("src", "test", "resources", "tika-pipes-test-config.xml")
+    static File tikaConfigTemplate = Paths
+            .get("src", "test", "resources", "tika-pipes-test-config.json")
             .toFile();
-    static File tikaConfigXml = new File("target", "tika-config-" + 
UUID.randomUUID() + ".xml");
-    static File tikaPluginsJson = new File("target", "tika-plugins-" + 
UUID.randomUUID() + ".json");
+    static File tikaConfig = new File("target", "tika-config-" + 
UUID.randomUUID() + ".json");
 
 
     @BeforeAll
     static void init() throws Exception {
-        FileUtils.copyFile(tikaConfigXmlTemplate, tikaConfigXml);
+        // Read the template config
+        String configContent = FileUtils.readFileToString(tikaConfigTemplate, 
StandardCharsets.UTF_8);
+
+        // Parse it as JSON to inject the correct javaPath
+        @SuppressWarnings("unchecked")
+        Map<String, Object> configMap = OBJECT_MAPPER.readValue(configContent, 
Map.class);
+
+        // Get or create the pipes section
+        @SuppressWarnings("unchecked")
+        Map<String, Object> pipesSection = (Map<String, Object>) 
configMap.get("pipes");
+        if (pipesSection == null) {
+            pipesSection = new java.util.HashMap<>();
+            configMap.put("pipes", pipesSection);
+        }
+
+        // Set javaPath to the same Java running the test
+        String javaHome = System.getProperty("java.home");
+        String javaPath = javaHome + File.separator + "bin" + File.separator + 
"java";
+        pipesSection.put("javaPath", javaPath);
+
+        LOG.info("Setting javaPath to: {}", javaPath);
+        LOG.info("java.home is: {}", javaHome);
+
+        // Update basePath in fetchers to use current working directory
+        @SuppressWarnings("unchecked")
+        Map<String, Object> fetchersSection = (Map<String, Object>) 
configMap.get("fetchers");
+        if (fetchersSection != null) {
+            String targetPath = new File("target").getAbsolutePath();
+            for (Map.Entry<String, Object> fetcherEntry : 
fetchersSection.entrySet()) {
+                @SuppressWarnings("unchecked")
+                Map<String, Object> fetcherConfig = (Map<String, Object>) 
fetcherEntry.getValue();
+                if (fetcherConfig.containsKey("file-system-fetcher")) {
+                    @SuppressWarnings("unchecked")
+                    Map<String, Object> fsConfig = (Map<String, Object>) 
fetcherConfig.get("file-system-fetcher");
+                    fsConfig.put("basePath", targetPath);
+                }
+            }
+        }
+
+        // Write the modified config
+        String modifiedConfig = 
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(configMap);
+        FileUtils.writeStringToFile(tikaConfig, modifiedConfig, 
StandardCharsets.UTF_8);
+
+        LOG.info("Written config to: {}", tikaConfig.getAbsolutePath());
+        LOG.info("Config content:\n{}", modifiedConfig);
     }
 
     @AfterAll
     static void clean() {
-        tikaConfigXml.setWritable(true);
-        tikaPluginsJson.setWritable(true);
-        FileUtils.deleteQuietly(tikaConfigXml);
-        FileUtils.deleteQuietly(tikaPluginsJson);
+        if (tikaConfig.exists()) {
+            if (!tikaConfig.setWritable(true)) {
+                LOG.warn("Failed to set {} writable", tikaConfig);
+            }
+        }
+        FileUtils.deleteQuietly(tikaConfig);
     }
 
     static final int NUM_FETCHERS_TO_CREATE = 10;
 
     @Test
     public void testFetcherCrud(Resources resources) throws Exception {
-        Assertions.assertTrue(tikaConfigXml.setWritable(false));
         String serverName = InProcessServerBuilder.generateName();
 
         Server server = InProcessServerBuilder
                 .forName(serverName)
                 .directExecutor()
-                .addService(new 
TikaGrpcServerImpl(tikaConfigXml.getAbsolutePath()))
+                .addService(new 
TikaGrpcServerImpl(tikaConfig.getAbsolutePath()))
                 .build()
                 .start();
         resources.register(server, Duration.ofSeconds(10));
@@ -167,24 +208,15 @@ public class TikaGrpcServerTest {
             assertEquals(FileSystemFetcher.class.getName(), 
getFetcherReply.getFetcherClass());
         }
 
-        // delete fetchers
+        // delete fetchers - note: delete is not currently supported
         for (int i = 0; i < NUM_FETCHERS_TO_CREATE; ++i) {
             String fetcherId = createFetcherId(i);
             DeleteFetcherReply deleteFetcherReply = 
blockingStub.deleteFetcher(DeleteFetcherRequest
                     .newBuilder()
                     .setFetcherId(fetcherId)
                     .build());
-            Assertions.assertTrue(deleteFetcherReply.getSuccess());
-            StatusRuntimeException statusRuntimeException = 
Assertions.assertThrows(StatusRuntimeException.class, () -> 
blockingStub.getFetcher(GetFetcherRequest
-                    .newBuilder()
-                    .setFetcherId(fetcherId)
-                    .build()));
-            Assertions.assertEquals(Status.NOT_FOUND
-                    .getCode()
-                    .value(), statusRuntimeException
-                    .getStatus()
-                    .getCode()
-                    .value());
+            // Delete is not supported, so this will return false
+            Assertions.assertFalse(deleteFetcherReply.getSuccess());
         }
     }
 
@@ -200,7 +232,7 @@ public class TikaGrpcServerTest {
         Server server = InProcessServerBuilder
                 .forName(serverName)
                 .directExecutor()
-                .addService(new 
TikaGrpcServerImpl(tikaConfigXml.getAbsolutePath()))
+                .addService(new 
TikaGrpcServerImpl(tikaConfig.getAbsolutePath()))
                 .build()
                 .start();
         resources.register(server, Duration.ofSeconds(10));
@@ -282,6 +314,19 @@ public class TikaGrpcServerTest {
                     .setFetchKey("does not exist")
                     .build());
             requestStreamObserver.onCompleted();
+            
+            // Wait a bit for async processing to complete
+            Thread.sleep(1000);
+            
+            // Log what we got for debugging
+            LOG.info("Successes: {}, Errors: {}", successes.size(), 
errors.size());
+            for (FetchAndParseReply success : successes) {
+                LOG.info("Success: {} - status: {}", success.getFetchKey(), 
success.getStatus());
+            }
+            for (FetchAndParseReply error : errors) {
+                LOG.info("Error: {} - status: {}", error.getFetchKey(), 
error.getStatus());
+            }
+            
             assertEquals(NUM_TEST_DOCS, successes.size());
             assertEquals(1, errors.size());
             assertTrue(finished.get());
diff --git a/tika-grpc/src/test/resources/tika-pipes-test-config.json 
b/tika-grpc/src/test/resources/tika-pipes-test-config.json
new file mode 100644
index 000000000..3dbff4c1a
--- /dev/null
+++ b/tika-grpc/src/test/resources/tika-pipes-test-config.json
@@ -0,0 +1,35 @@
+{
+  "pipes": {
+    "numClients": 2,
+    "forkedJvmArgs": [
+      "-Xmx1g",
+      "-XX:ParallelGCThreads=2"
+    ],
+    "timeoutMillis": 60000,
+    "emitStrategy": {
+      "type": "PASSBACK_ALL"
+    },
+    "staleFetcherTimeoutSeconds": 600,
+    "staleFetcherDelaySeconds": 60
+  },
+  "fetchers": {
+    "nick1:is:cool:super/class 
org.apache.tika.pipes.fetcher.fs.FileSystemFetcher": {
+      "file-system-fetcher": {
+        "basePath": 
"/home/ndipiazza/source/github/apache/tika/tika-grpc/target",
+        "extractFileSystemMetadata": true
+      }
+    },
+    "httpFetcherIdHere": {
+      "http-fetcher": {
+        "requestTimeout": 30000,
+        "socketTimeout": 30000,
+        "connectTimeout": 20000,
+        "maxConnectionsPerRoute": 200,
+        "maxRedirects": 0,
+        "maxSpoolSize": -1,
+        "overallTimeout": 50000
+      }
+    }
+  },
+  "plugin-roots": ["/tmp/tika-test-plugins"]
+}
diff --git a/tika-grpc/src/test/resources/tika-pipes-test-config.xml 
b/tika-grpc/src/test/resources/tika-pipes-test-config.xml
deleted file mode 100644
index d796a05a0..000000000
--- a/tika-grpc/src/test/resources/tika-pipes-test-config.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<properties>
-  <async>
-    <staleFetcherTimeoutSeconds>600</staleFetcherTimeoutSeconds>
-    <staleFetcherDelaySeconds>60</staleFetcherDelaySeconds>
-  </async>
-  <pipes>
-    <params>
-      <numClients>2</numClients>
-      <forkedJvmArgs>
-        <arg>-Xmx1g</arg>
-        <arg>-XX:ParallelGCThreads=2</arg>
-      </forkedJvmArgs>
-      <timeoutMillis>60000</timeoutMillis>
-      <directEmitThresholdBytes>-1</directEmitThresholdBytes> <!-- disable 
emit -->
-    </params>
-  </pipes>
-  <fetchers>
-  </fetchers>
-</properties>
diff --git a/tika-pipes/tika-pipes-plugins/pom.xml 
b/tika-pipes/tika-pipes-plugins/pom.xml
index 246c83f7f..abc9314f6 100644
--- a/tika-pipes/tika-pipes-plugins/pom.xml
+++ b/tika-pipes/tika-pipes-plugins/pom.xml
@@ -27,7 +27,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <artifactId>tika-pipes-plugins</artifactId>
-  <name>Apache Tika emitters</name>
+  <name>Apache Tika Plugins</name>
   <url>https://tika.apache.org/</url>
   <packaging>pom</packaging>
 
@@ -97,4 +97,4 @@
   <scm>
     <tag>3.0.0-rc1</tag>
   </scm>
-</project>
\ No newline at end of file
+</project>

Reply via email to