This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new fb9717f06 TIKA-4561 -- remove tikaconfigpath from pipesconfig (#2430)
fb9717f06 is described below
commit fb9717f066c57340c8d8beddb461a842e0e6e48d
Author: Tim Allison <[email protected]>
AuthorDate: Tue Dec 9 14:01:44 2025 -0500
TIKA-4561 -- remove tikaconfigpath from pipesconfig (#2430)
---
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 2 +-
.../tika/async/cli/TikaConfigAsyncWriterTest.java | 2 +-
.../org/apache/tika/pipes/core/PipesClient.java | 6 ++-
.../org/apache/tika/pipes/core/PipesConfig.java | 19 +-------
.../org/apache/tika/pipes/core/PipesParser.java | 7 ++-
.../tika/pipes/core/async/AsyncProcessor.java | 17 ++++---
.../apache/tika/pipes/core/server/PipesServer.java | 2 +-
.../apache/tika/pipes/core/PassbackFilterTest.java | 4 +-
.../apache/tika/pipes/core/PipesClientTest.java | 52 +++++++++++-----------
.../apache/tika/pipes/core/PipesServerTest.java | 2 +-
.../tika/server/core/resource/PipesResource.java | 4 +-
11 files changed, 52 insertions(+), 65 deletions(-)
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index f5f1eb1f2..39f57b943 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -115,7 +115,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
tikaConfigPath = tikaConfigFile.getAbsolutePath();
}
pipesConfig =
TikaLoader.load(tikaConfigFile.toPath()).configs().load("pipes",
PipesConfig.class);
- pipesClient = new PipesClient(pipesConfig);
+ pipesClient = new PipesClient(pipesConfig, tikaConfigFile.toPath());
expiringFetcherStore = new
ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
pipesConfig.getStaleFetcherDelaySeconds());
diff --git
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java
index de22996ff..225bf031e 100644
---
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java
+++
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaConfigAsyncWriterTest.java
@@ -45,7 +45,7 @@ public class TikaConfigAsyncWriterTest {
Path tmp = Files.createTempFile(dir, "plugins-",".json");
pluginsWriter.write(tmp);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tmp);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tmp);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
assertEquals("-Xmx1g", pipesConfig.getForkedJvmArgs().get(0));
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index fb89ade96..d7312338c 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -94,12 +94,14 @@ public class PipesClient implements Closeable {
private final PipesConfig pipesConfig;
+ private final Path tikaConfigPath;
private final int pipesClientId;
private ServerTuple serverTuple;
private int filesProcessed = 0;
- public PipesClient(PipesConfig pipesConfig) {
+ public PipesClient(PipesConfig pipesConfig, Path tikaConfigPath) {
this.pipesConfig = pipesConfig;
+ this.tikaConfigPath = tikaConfigPath;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
}
@@ -572,7 +574,7 @@ public class PipesClient implements Closeable {
commandLine.add("org.apache.tika.pipes.core.server.PipesServer");
commandLine.add(Integer.toString(port));
- commandLine.add(pipesConfig.getTikaConfigPath());
+ commandLine.add(tikaConfigPath.toAbsolutePath().toString());
LOG.debug("pipesClientId={}: commandline: {}", pipesClientId,
commandLine);
return commandLine.toArray(new String[0]);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
index 51f572e0a..af9e0bb0e 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java
@@ -17,11 +17,8 @@
package org.apache.tika.pipes.core;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.ArrayList;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
@@ -75,9 +72,6 @@ public class PipesConfig {
public static final int DEFAULT_QUEUE_SIZE = 10000;
public static final int DEFAULT_NUM_EMITTERS = 1;
- @JsonIgnore
- private volatile String tikaConfigPath = null;
-
private long emitWithinMillis = DEFAULT_EMIT_WITHIN_MILLIS;
private long emitMaxEstimatedBytes = DEFAULT_EMIT_MAX_ESTIMATED_BYTES;
private int queueSize = DEFAULT_QUEUE_SIZE;
@@ -103,25 +97,14 @@ public class PipesConfig {
* @throws IOException if deserialization fails
* @throws TikaConfigException if configuration is invalid
*/
- public static PipesConfig load(TikaJsonConfig tikaJsonConfig, Path
tikaConfigPath) throws IOException, TikaConfigException {
+ public static PipesConfig load(TikaJsonConfig tikaJsonConfig) throws
IOException, TikaConfigException {
PipesConfig config = tikaJsonConfig.deserialize("pipes",
PipesConfig.class);
if (config == null) {
config = new PipesConfig();
}
- config.setTikaConfigPath(tikaConfigPath.toAbsolutePath().toString());
return config;
}
- @JsonIgnore
- public String getTikaConfigPath() {
- return tikaConfigPath;
- }
-
- void setTikaConfigPath(String tikaConfigPath) {
- this.tikaConfigPath = tikaConfigPath;
- }
-
-
public long getTimeoutMillis() {
return timeoutMillis;
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
index 567f985d9..42bf6b596 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesParser.java
@@ -18,6 +18,7 @@ package org.apache.tika.pipes.core;
import java.io.Closeable;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -30,15 +31,17 @@ public class PipesParser implements Closeable {
private final PipesConfig pipesConfig;
+ private final Path tikaConfigPath;
private final List<PipesClient> clients = new ArrayList<>();
private final ArrayBlockingQueue<PipesClient> clientQueue ;
- public PipesParser(PipesConfig pipesConfig) {
+ public PipesParser(PipesConfig pipesConfig, Path tikaConfigPath) {
this.pipesConfig = pipesConfig;
+ this.tikaConfigPath = tikaConfigPath;
this.clientQueue = new
ArrayBlockingQueue<>(pipesConfig.getNumClients());
for (int i = 0; i < pipesConfig.getNumClients(); i++) {
- PipesClient client = new PipesClient(pipesConfig);
+ PipesClient client = new PipesClient(pipesConfig, tikaConfigPath);
clientQueue.offer(client);
clients.add(client);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index abb9915b6..7fe7fc52e 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -68,6 +68,7 @@ public class AsyncProcessor implements Closeable {
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
private final PipesConfig asyncConfig;
+ private final Path tikaConfigPath;
private final PipesReporter pipesReporter;
private final AtomicLong totalProcessed = new AtomicLong(0);
private static long MAX_OFFER_WAIT_MS = 120000;
@@ -84,7 +85,8 @@ public class AsyncProcessor implements Closeable {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
TikaPluginManager tikaPluginManager =
TikaPluginManager.load(tikaJsonConfig);
MetadataFilter metadataFilter =
TikaLoader.load(tikaConfigPath).loadMetadataFilters();
- this.asyncConfig = PipesConfig.load(tikaJsonConfig, tikaConfigPath);
+ this.asyncConfig = PipesConfig.load(tikaJsonConfig);
+ this.tikaConfigPath = tikaConfigPath;
this.pipesReporter = ReporterManager.load(tikaPluginManager,
tikaJsonConfig);
LOG.debug("loaded reporter {}", pipesReporter.getClass());
this.fetchEmitTuples = new
ArrayBlockingQueue<>(asyncConfig.getQueueSize());
@@ -95,12 +97,6 @@ public class AsyncProcessor implements Closeable {
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
try {
- if (asyncConfig.getTikaConfigPath() != null &&
!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfigPath())) {
- LOG.warn("TikaConfig for AsyncProcessor ({}) is different " +
- "from TikaConfig for workers ({}). If this is
intended," +
- " please ignore this warning.",
tikaConfigPath.toAbsolutePath(),
- asyncConfig.getTikaConfigPath());
- }
this.executorCompletionService.submit(() -> {
while (true) {
try {
@@ -119,7 +115,7 @@ public class AsyncProcessor implements Closeable {
for (int i = 0; i < asyncConfig.getNumClients(); i++) {
executorCompletionService.submit(
- new FetchEmitWorker(asyncConfig, fetchEmitTuples,
emitDatumTuples));
+ new FetchEmitWorker(asyncConfig, tikaConfigPath,
fetchEmitTuples, emitDatumTuples));
}
EmitterManager emitterManager =
EmitterManager.load(tikaPluginManager, tikaJsonConfig);
@@ -272,13 +268,16 @@ public class AsyncProcessor implements Closeable {
private class FetchEmitWorker implements Callable<Integer> {
private final PipesConfig asyncConfig;
+ private final Path tikaConfigPath;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
private FetchEmitWorker(PipesConfig asyncConfig,
+ Path tikaConfigPath,
ArrayBlockingQueue<FetchEmitTuple>
fetchEmitTuples,
ArrayBlockingQueue<EmitDataPair>
emitDataTupleQueue) {
this.asyncConfig = asyncConfig;
+ this.tikaConfigPath = tikaConfigPath;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataTupleQueue = emitDataTupleQueue;
}
@@ -286,7 +285,7 @@ public class AsyncProcessor implements Closeable {
@Override
public Integer call() throws Exception {
- try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
+ try (PipesClient pipesClient = new PipesClient(asyncConfig,
tikaConfigPath)) {
while (true) {
FetchEmitTuple t = fetchEmitTuples.poll(1,
TimeUnit.SECONDS);
if (t == null) {
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 949dcfea4..48c689955 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -162,7 +162,7 @@ public class PipesServer implements AutoCloseable {
try {
TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
// Set socket timeout from config after loading PipesConfig
socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
index dad47a170..c00989a1a 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -53,10 +53,10 @@ public class PassbackFilterTest {
public void init(Path tmpDir) throws Exception {
Path pipesConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig("tika-config-passback.json",
tmpDir);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pipesConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
pipesConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
PluginsTestHelper.copyTestFilesToTmpInput(tmpDir, testPdfFile);
- pipesClient = new PipesClient(pipesConfig);
+ pipesClient = new PipesClient(pipesConfig, pipesConfigPath);
}
@Test
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index e821e1c60..a5bd079aa 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -53,8 +53,8 @@ public class PipesClientTest {
PluginsTestHelper.copyTestFilesToTmpInput(tmp, testFileName);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
- return new PipesClient(pipesConfig);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+ return new PipesClient(pipesConfig, tikaConfigPath);
}
@Test
@@ -137,9 +137,9 @@ public class PipesClientTest {
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
// First test: Short timeout (1 second) - should timeout
ParseContext shortTimeoutContext = new ParseContext();
shortTimeoutContext.set(TikaTaskTimeout.class, new
TikaTaskTimeout(1000));
@@ -176,9 +176,9 @@ public class PipesClientTest {
"tika-config-bad-class.json", tmp);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testDoc,
new FetchKey("bad-fetcher", testDoc),
new EmitKey(), new Metadata(), new ParseContext(),
@@ -205,9 +205,9 @@ public class PipesClientTest {
PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testDoc,
new FetchKey("fsf", testDoc),
new EmitKey(), new Metadata(), new ParseContext(),
@@ -233,9 +233,9 @@ public class PipesClientTest {
PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testDoc,
new FetchKey("fsf", testDoc),
new EmitKey(), new Metadata(), new ParseContext(),
@@ -259,9 +259,9 @@ public class PipesClientTest {
PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testDoc,
new FetchKey("fsf", testDoc),
new EmitKey(), new Metadata(), new ParseContext(),
@@ -309,7 +309,7 @@ public class PipesClientTest {
"tika-config-timeout-lt-heartbeat.json", tmp);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
// Verify the misconfiguration that triggers socket timeout
assertEquals(3000, pipesConfig.getSocketTimeoutMs(), "Socket timeout
should be 3 seconds");
@@ -319,7 +319,7 @@ public class PipesClientTest {
// The config file includes -Dtika.pipes.allowInvalidHeartbeat=true in
forkedJvmArgs
// to allow this invalid configuration for testing only
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testFile,
new FetchKey("fsf", testFile),
new EmitKey(), new Metadata(), new ParseContext(),
@@ -362,9 +362,9 @@ public class PipesClientTest {
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testFile,
new FetchKey(fetcherName, testFile),
new EmitKey(emitterName, ""), new Metadata(), new
ParseContext(),
@@ -396,9 +396,9 @@ public class PipesClientTest {
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
// Request a file that doesn't exist
String nonExistentFile = "does-not-exist.pdf";
FetchEmitTuple tuple = new FetchEmitTuple(nonExistentFile,
@@ -450,9 +450,9 @@ public class PipesClientTest {
// Config has onExists=EXCEPTION which will trigger
FileAlreadyExistsException
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig("tika-config-emit-all.json", tmp,
inputDir, outputDir, false);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
FetchEmitTuple tuple = new FetchEmitTuple(testFile,
new FetchKey(fetcherName, testFile),
new EmitKey(emitterName, ""), new Metadata(), new
ParseContext(),
@@ -479,9 +479,9 @@ public class PipesClientTest {
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir,
tmp.resolve("output"));
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
// Use invalid fetcher name
FetchEmitTuple tuple = new FetchEmitTuple("test.pdf",
new FetchKey("non-existent-fetcher", "test.pdf"),
@@ -524,9 +524,9 @@ public class PipesClientTest {
// Use config with directEmitThresholdBytes=0 to force server-side
emission
Path tikaConfigPath =
PluginsTestHelper.getFileSystemFetcherConfig("tika-config-emit-all.json", tmp,
inputDir, tmp.resolve("output"), false);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
// Use invalid emitter name
FetchEmitTuple tuple = new FetchEmitTuple(testFile,
new FetchKey(fetcherName, testFile),
@@ -583,9 +583,9 @@ public class PipesClientTest {
Files.writeString(tikaConfigPath, configContent,
StandardCharsets.UTF_8);
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig,
tikaConfigPath);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
- try (PipesClient pipesClient = new PipesClient(pipesConfig)) {
+ try (PipesClient pipesClient = new PipesClient(pipesConfig,
tikaConfigPath)) {
// Process file - should complete successfully despite multiple
heartbeats
PipesResult pipesResult = pipesClient.process(
new FetchEmitTuple(testFile, new FetchKey(fetcherName,
testFile),
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java
index 854246c62..7c137084c 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesServerTest.java
@@ -38,7 +38,7 @@ public class PipesServerTest extends TikaTest {
PluginsTestHelper.copyTestFilesToTmpInput(tmp, testDoc);
TikaLoader tikaLoader = TikaLoader.load(tikaConfig);
- PipesConfig pipesConfig = PipesConfig.load(tikaLoader.getConfig(),
tikaConfig);
+ PipesConfig pipesConfig = PipesConfig.load(tikaLoader.getConfig());
PipesServer pipesServer = PipesServer.load(40, tikaConfig);
FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id",
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index 8c1530511..9ffa943e9 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -54,7 +54,7 @@ public class PipesResource {
public PipesResource(java.nio.file.Path tikaConfig) throws
TikaConfigException, IOException {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfig);
- PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfig);
+ PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
//this has to be zero. everything must be emitted through the
PipesServer
long maxEmit = pipesConfig.getDirectEmitThresholdBytes();
if (maxEmit != 0) {
@@ -63,7 +63,7 @@ public class PipesResource {
LOG.warn("resetting max for emit batch to 0");
}
}
- this.pipesParser = new PipesParser(pipesConfig);
+ this.pipesParser = new PipesParser(pipesConfig, tikaConfig);
}