This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4561-remove-tika-config in repository https://gitbox.apache.org/repos/asf/tika.git
commit cdf45147b764e12723a29dd01702a5489f132af5 Author: tallison <[email protected]> AuthorDate: Tue Dec 9 13:13:07 2025 -0500 TIKA-4561 -- remove tikaconfigpath from pipesconfig --- .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 2 +- .../tika/async/cli/TikaConfigAsyncWriterTest.java | 2 +- .../org/apache/tika/pipes/core/PipesClient.java | 6 ++- .../org/apache/tika/pipes/core/PipesConfig.java | 16 +------ .../org/apache/tika/pipes/core/PipesParser.java | 7 ++- .../tika/pipes/core/async/AsyncProcessor.java | 17 ++++--- .../apache/tika/pipes/core/server/PipesServer.java | 2 +- .../apache/tika/pipes/core/PassbackFilterTest.java | 4 +- .../apache/tika/pipes/core/PipesClientTest.java | 52 +++++++++++----------- .../apache/tika/pipes/core/PipesServerTest.java | 2 +- .../tika/server/core/resource/PipesResource.java | 4 +- 11 files changed, 52 insertions(+), 62 deletions(-) diff --git a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java index f5f1eb1f2..39f57b943 100644 --- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java +++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java @@ -115,7 +115,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase { tikaConfigPath = tikaConfigFile.getAbsolutePath(); } pipesConfig = TikaLoader.load(tikaConfigFile.toPath()).configs().load("pipes", PipesConfig.class); - pipesClient = new PipesClient(pipesConfig); + pipesClient = new PipesClient(pipesConfig, tikaConfigFile.toPath()); expiringFetcherStore = new ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(), pipesConfig.getStaleFetcherDelaySeconds()); diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java index de22996ff..225bf031e 100644 --- a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java +++ b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java @@ -45,7 +45,7 @@ public class TikaConfigAsyncWriterTest { Path tmp = Files.createTempFile(dir, "plugins-",".json"); pluginsWriter.write(tmp); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tmp); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tmp); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); assertEquals("-Xmx1g", pipesConfig.getForkedJvmArgs().get(0)); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index fb89ade96..d7312338c 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -94,12 +94,14 @@ public class PipesClient implements Closeable { private final PipesConfig pipesConfig; + private final Path tikaConfigPath; private final int pipesClientId; private ServerTuple serverTuple; private int filesProcessed = 0; - public PipesClient(PipesConfig pipesConfig) { + public PipesClient(PipesConfig pipesConfig, Path tikaConfigPath) { this.pipesConfig = pipesConfig; + this.tikaConfigPath = tikaConfigPath; this.pipesClientId = CLIENT_COUNTER.getAndIncrement(); } @@ -572,7 +574,7 @@ public class PipesClient implements Closeable { commandLine.add("org.apache.tika.pipes.core.server.PipesServer"); commandLine.add(Integer.toString(port)); - commandLine.add(pipesConfig.getTikaConfigPath()); + commandLine.add(tikaConfigPath.toAbsolutePath().toString()); LOG.debug("pipesClientId={}: commandline: {}", pipesClientId, commandLine); return commandLine.toArray(new String[0]); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java index 51f572e0a..1153a3aa3 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java @@ -75,9 +75,6 @@ public class PipesConfig { public static final int DEFAULT_QUEUE_SIZE = 10000; public static final int DEFAULT_NUM_EMITTERS = 1; - @JsonIgnore - private volatile String tikaConfigPath = null; - private long emitWithinMillis = DEFAULT_EMIT_WITHIN_MILLIS; private long emitMaxEstimatedBytes = DEFAULT_EMIT_MAX_ESTIMATED_BYTES; private int queueSize = DEFAULT_QUEUE_SIZE; @@ -103,25 +100,14 @@ public class PipesConfig { * @throws IOException if deserialization fails * @throws TikaConfigException if configuration is invalid */ - public static PipesConfig load(TikaJsonConfig tikaJsonConfig, Path tikaConfigPath) throws IOException, TikaConfigException { + public static PipesConfig load(TikaJsonConfig tikaJsonConfig) throws IOException, TikaConfigException { PipesConfig config = tikaJsonConfig.deserialize("pipes", PipesConfig.class); if (config == null) { config = new PipesConfig(); } - config.setTikaConfigPath(tikaConfigPath.toAbsolutePath().toString()); return config; } - @JsonIgnore - public String getTikaConfigPath() { - return tikaConfigPath; - } - - void setTikaConfigPath(String tikaConfigPath) { - this.tikaConfigPath = tikaConfigPath; - } - - public long getTimeoutMillis() { return timeoutMillis; } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java index 567f985d9..42bf6b596 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java @@ -18,6 +18,7 @@ package org.apache.tika.pipes.core; import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; @@ -30,15 +31,17 @@ public class PipesParser implements Closeable { private final PipesConfig pipesConfig; + private final Path tikaConfigPath; private final List<PipesClient> clients = new ArrayList<>(); private final ArrayBlockingQueue<PipesClient> clientQueue ; - public PipesParser(PipesConfig pipesConfig) { + public PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) { this.pipesConfig = pipesConfig; + this.tikaConfigPath = tikaConfigPath; this.clientQueue = new ArrayBlockingQueue<>(pipesConfig.getNumClients()); for (int i = 0; i < pipesConfig.getNumClients(); i++) { - PipesClient client = new PipesClient(pipesConfig); + PipesClient client = new PipesClient(pipesConfig, tikaConfigPath); clientQueue.offer(client); clients.add(client); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java index abb9915b6..7fe7fc52e 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java @@ -68,6 +68,7 @@ public class AsyncProcessor implements Closeable { private final ExecutorCompletionService<Integer> executorCompletionService; private final ExecutorService executorService; private final PipesConfig asyncConfig; + private final Path tikaConfigPath; private final PipesReporter pipesReporter; private final AtomicLong totalProcessed = new AtomicLong(0); private static long MAX_OFFER_WAIT_MS = 120000; @@ -84,7 +85,8 @@ public class AsyncProcessor implements Closeable { TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); TikaPluginManager tikaPluginManager = TikaPluginManager.load(tikaJsonConfig); MetadataFilter metadataFilter = TikaLoader.load(tikaConfigPath).loadMetadataFilters(); - this.asyncConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + this.asyncConfig = PipesConfig.load(tikaJsonConfig); + this.tikaConfigPath = tikaConfigPath; this.pipesReporter = ReporterManager.load(tikaPluginManager, tikaJsonConfig); LOG.debug("loaded reporter {}", pipesReporter.getClass()); this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize()); @@ -95,12 +97,6 @@ public class AsyncProcessor implements Closeable { this.executorCompletionService = new ExecutorCompletionService<>(executorService); try { - if (asyncConfig.getTikaConfigPath() != null && !tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfigPath())) { - LOG.warn("TikaConfig for AsyncProcessor ({}) is different " + - "from TikaConfig for workers ({}). If this is intended," + - " please ignore this warning.", tikaConfigPath.toAbsolutePath(), - asyncConfig.getTikaConfigPath()); - } this.executorCompletionService.submit(() -> { while (true) { try { @@ -119,7 +115,7 @@ public class AsyncProcessor implements Closeable { for (int i = 0; i < asyncConfig.getNumClients(); i++) { executorCompletionService.submit( - new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitDatumTuples)); + new FetchEmitWorker(asyncConfig, tikaConfigPath, fetchEmitTuples, emitDatumTuples)); } EmitterManager emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig); @@ -272,13 +268,16 @@ public class AsyncProcessor implements Closeable { private class FetchEmitWorker implements Callable<Integer> { private final PipesConfig asyncConfig; + private final Path tikaConfigPath; private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples; private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue; private FetchEmitWorker(PipesConfig asyncConfig, + Path tikaConfigPath, ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples, ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue) { this.asyncConfig = asyncConfig; + this.tikaConfigPath = tikaConfigPath; this.fetchEmitTuples = fetchEmitTuples; this.emitDataTupleQueue = emitDataTupleQueue; } @@ -286,7 +285,7 @@ public class AsyncProcessor implements Closeable { @Override public Integer call() throws Exception { - try (PipesClient pipesClient = new PipesClient(asyncConfig)) { + try (PipesClient pipesClient = new PipesClient(asyncConfig, tikaConfigPath)) { while (true) { FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS); if (t == null) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 949dcfea4..48c689955 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -162,7 +162,7 @@ public class PipesServer implements AutoCloseable { try { TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath); TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig(); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); // Set socket timeout from config after loading PipesConfig socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs()); diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java index dad47a170..c00989a1a 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java @@ -53,10 +53,10 @@ public class PassbackFilterTest { public void init(Path tmpDir) throws Exception { Path pipesConfigPath = PluginsTestHelper.getFileSystemFetcherConfig("tika-config-passback.json", tmpDir); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pipesConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, pipesConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); PluginsTestHelper.copyTestFilesToTmpInput(tmpDir, testPdfFile); - pipesClient = new PipesClient(pipesConfig); + pipesClient = new PipesClient(pipesConfig, pipesConfigPath); } @Test diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java index e821e1c60..a5bd079aa 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java @@ -53,8 +53,8 @@ public class PipesClientTest { PluginsTestHelper.copyTestFilesToTmpInput(tmp, testFileName); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); - return new PipesClient(pipesConfig); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); + return new PipesClient(pipesConfig, tikaConfigPath); } @Test @@ -137,9 +137,9 @@ public class PipesClientTest { Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, tmp.resolve("output")); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { // First test: Short timeout (1 second) - should timeout ParseContext shortTimeoutContext = new ParseContext(); shortTimeoutContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000)); @@ -176,9 +176,9 @@ public class PipesClientTest { "tika-config-bad-class.json", tmp); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testDoc, new FetchKey("bad-fetcher", testDoc), new EmitKey(), new Metadata(), new ParseContext(), @@ -205,9 +205,9 @@ public class PipesClientTest { PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testDoc, new FetchKey("fsf", testDoc), new EmitKey(), new Metadata(), new ParseContext(), @@ -233,9 +233,9 @@ public class PipesClientTest { PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testDoc, new FetchKey("fsf", testDoc), new EmitKey(), new Metadata(), new ParseContext(), @@ -259,9 +259,9 @@ public class PipesClientTest { PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testDoc, new FetchKey("fsf", testDoc), new EmitKey(), new Metadata(), new ParseContext(), @@ -309,7 +309,7 @@ public class PipesClientTest { "tika-config-timeout-lt-heartbeat.json", tmp); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); // Verify the misconfiguration that triggers socket timeout assertEquals(3000, pipesConfig.getSocketTimeoutMs(), "Socket timeout should be 3 seconds"); @@ -319,7 +319,7 @@ public class PipesClientTest { // The config file includes -Dtika.pipes.allowInvalidHeartbeat=true in forkedJvmArgs // to allow this invalid configuration for testing only - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testFile, new FetchKey("fsf", testFile), new EmitKey(), new Metadata(), new ParseContext(), @@ -362,9 +362,9 @@ public class PipesClientTest { Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, tmp.resolve("output")); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testFile, new FetchKey(fetcherName, testFile), new EmitKey(emitterName, ""), new Metadata(), new ParseContext(), @@ -396,9 +396,9 @@ public class PipesClientTest { Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, tmp.resolve("output")); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { // Request a file that doesn't exist String nonExistentFile = "does-not-exist.pdf"; FetchEmitTuple tuple = new FetchEmitTuple(nonExistentFile, @@ -450,9 +450,9 @@ public class PipesClientTest { // Config has onExists=EXCEPTION which will trigger FileAlreadyExistsException Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig("tika-config-emit-all.json", tmp, inputDir, outputDir, false); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { FetchEmitTuple tuple = new FetchEmitTuple(testFile, new FetchKey(fetcherName, testFile), new EmitKey(emitterName, ""), new Metadata(), new ParseContext(), @@ -479,9 +479,9 @@ public class PipesClientTest { Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, tmp.resolve("output")); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { // Use invalid fetcher name FetchEmitTuple tuple = new FetchEmitTuple("test.pdf", new FetchKey("non-existent-fetcher", "test.pdf"), @@ -524,9 +524,9 @@ public class PipesClientTest { // Use config with directEmitThresholdBytes=0 to force server-side emission Path tikaConfigPath = PluginsTestHelper.getFileSystemFetcherConfig("tika-config-emit-all.json", tmp, inputDir, tmp.resolve("output"), false); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { // Use invalid emitter name FetchEmitTuple tuple = new FetchEmitTuple(testFile, new FetchKey(fetcherName, testFile), @@ -583,9 +583,9 @@ public class PipesClientTest { Files.writeString(tikaConfigPath, configContent, StandardCharsets.UTF_8); TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); - try (PipesClient pipesClient = new PipesClient(pipesConfig)) { + try (PipesClient pipesClient = new PipesClient(pipesConfig, tikaConfigPath)) { // Process file - should complete successfully despite multiple heartbeats PipesResult pipesResult = pipesClient.process( new FetchEmitTuple(testFile, new FetchKey(fetcherName, testFile), diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java index 854246c62..7c137084c 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java @@ -38,7 +38,7 @@ public class PipesServerTest extends TikaTest { PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc); TikaLoader tikaLoader = TikaLoader.load(tikaConfig); - PipesConfig pipesConfig = PipesConfig.load(tikaLoader.getConfig(), tikaConfig); + PipesConfig pipesConfig = PipesConfig.load(tikaLoader.getConfig()); PipesServer pipesServer = PipesServer.load(40, tikaConfig); FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id", diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java index 8c1530511..9ffa943e9 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java @@ -54,7 +54,7 @@ public class PipesResource { public PipesResource(java.nio.file.Path tikaConfig) throws TikaConfigException, IOException { TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfig); - PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfig); + PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig); //this has to be zero. everything must be emitted through the PipesServer long maxEmit = pipesConfig.getDirectEmitThresholdBytes(); if (maxEmit != 0) { @@ -63,7 +63,7 @@ public class PipesResource { LOG.warn("resetting max for emit batch to 0"); } } - this.pipesParser = new PipesParser(pipesConfig); + this.pipesParser = new PipesParser(pipesConfig, tikaConfig); }
