This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-4519
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4519 by this push:
new d73027b5b TIKA-4519 -- tika-server now works
d73027b5b is described below
commit d73027b5b1c053878c44e597ea72c217510c59c5
Author: tallison <[email protected]>
AuthorDate: Wed Nov 5 10:49:36 2025 -0500
TIKA-4519 -- tika-server now works
---
.../tika/pipes/core/async/AsyncProcessor.java | 8 +-
.../apache/tika/plugins/TikaPluginsManager.java | 8 ++
.../resources/tika-config-simple-fs-emitter.xml | 2 +-
tika-server/tika-server-core/pom.xml | 37 +++++++-
.../tika/server/core/FetcherStreamFactory.java | 19 +++--
.../org/apache/tika/server/core/TikaServerCli.java | 3 +-
.../apache/tika/server/core/TikaServerConfig.java | 98 +++++++---------------
.../apache/tika/server/core/TikaServerProcess.java | 5 +-
.../tika/server/core/resource/AsyncResource.java | 14 ++--
.../tika/server/core/resource/PipesResource.java | 2 +-
.../org/apache/tika/server/core/CXFTestBase.java | 85 ++++++++++++++++---
.../org/apache/tika/server/core/TikaPipesTest.java | 30 ++++---
.../tika/server/core/TikaResourceFetcherTest.java | 9 +-
.../core/TikaServerAsyncIntegrationTest.java | 23 +++--
.../tika/server/core/TikaServerConfigTest.java | 14 ++--
.../core/TikaServerPipesIntegrationTest.java | 18 +---
.../tika-config-server-fetchers-emitters.xml | 6 --
.../test/resources/configs/tika-pipes-config.json | 18 +++-
tika-server/tika-server-standard/pom.xml | 7 ++
.../apache/tika/server/standard/FetcherTest.java | 2 +-
.../apache/tika/server/standard/TikaPipesTest.java | 85 ++++++++-----------
21 files changed, 271 insertions(+), 222 deletions(-)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index 0cc4889fc..5b387acfc 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -68,12 +68,12 @@ public class AsyncProcessor implements Closeable {
private boolean addedEmitterSemaphores = false;
boolean isShuttingDown = false;
- public AsyncProcessor(Path tikaConfigPath, Path pipesPluginsConfigPath)
throws TikaException, IOException {
- this(tikaConfigPath, pipesPluginsConfigPath, null);
+ public AsyncProcessor(Path tikaConfigPath, Path pluginsConfigPath) throws
TikaException, IOException {
+ this(tikaConfigPath, pluginsConfigPath, null);
}
- public AsyncProcessor(Path tikaConfigPath, Path pipesPluginsConfigPath,
PipesIterator pipesIterator) throws TikaException, IOException {
- this.asyncConfig = AsyncConfig.load(tikaConfigPath,
pipesPluginsConfigPath);
+ public AsyncProcessor(Path tikaConfigPath, Path pluginsConfigPath,
PipesIterator pipesIterator) throws TikaException, IOException {
+ this.asyncConfig = AsyncConfig.load(tikaConfigPath, pluginsConfigPath);
this.fetchEmitTuples = new
ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.emitDatumTuples = new ArrayBlockingQueue<>(100);
//+1 is the watcher thread
diff --git
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginsManager.java
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginsManager.java
index f28a916cf..9ddab175c 100644
---
a/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginsManager.java
+++
b/tika-plugins-core/src/main/java/org/apache/tika/plugins/TikaPluginsManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -43,6 +44,13 @@ public class TikaPluginsManager {
EMITTERS
}
+ public static JsonNode loadRoot(Path p) throws IOException,
TikaConfigException {
+ try (InputStream is = Files.newInputStream(p)) {
+ return loadRoot(is);
+ }
+ }
+
+
public static JsonNode loadRoot(InputStream is) throws IOException,
TikaConfigException {
return new ObjectMapper().readTree(new BufferedReader(new
InputStreamReader(is, StandardCharsets.UTF_8)));
}
diff --git
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 8bea9b464..d4dc04613 100644
---
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -26,7 +26,7 @@
</parsers>
<pipesIterator
class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
<params>
- <fetcherPluginId>fs</fetcherPluginId>
+ <fetcherId>fs</fetcherId>
<emitterName>fs</emitterName>
<basePath>UPDATE</basePath>
</params>
diff --git a/tika-server/tika-server-core/pom.xml
b/tika-server/tika-server-core/pom.xml
index 794d313a0..b7f4668a6 100644
--- a/tika-server/tika-server-core/pom.xml
+++ b/tika-server/tika-server-core/pom.xml
@@ -15,7 +15,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tika-server</artifactId>
<groupId>org.apache.tika</groupId>
@@ -142,6 +143,38 @@
</systemPropertyVariables>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-plugins</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/plugins</outputDirectory>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-fetcher-file-system</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-emitter-file-system</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
@@ -215,7 +248,7 @@
</restFramework>
</restModel>
<output>
- <html />
+ <html/>
</output>
<!-- Free Miredot license key, valid until Jan 31st(?), 2025
https://issues.apache.org/jira/browse/TIKA-2253
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
index f999a95d6..58f79e08d 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
@@ -38,12 +38,12 @@ import org.apache.tika.pipes.core.fetcher.FetcherManager;
import org.apache.tika.server.core.resource.TikaResource;
/**
- * This class looks for "fetcherPluginId" in the http header. If it
is not null
+ * This class looks for "fetcherId" in the http header. If it is
not null
* and not empty, this will return a new TikaInputStream from the fetch key
* and the base path as set in the definition of the named fetcher.
* As of Tika > 2.5.0, the "fetchKey" is URL decoded.
* <p>
- * Users may also specify the "fetcherPluginId"e; and
"fetchKey" in
+ * Users may also specify the "fetcherId"e; and "fetchKey"
in
* query parameters with in the request.
* <p>
* <em>WARNING:</em> Unless you carefully lock down access to the server,
@@ -81,7 +81,7 @@ public class FetcherStreamFactory implements
InputStreamFactory {
@Override
public InputStream getInputStream(InputStream is, Metadata metadata,
HttpHeaders httpHeaders, UriInfo uriInfo) throws IOException {
MultivaluedMap params = (uriInfo == null) ? null :
uriInfo.getQueryParameters();
- String fetcherPluginId = getParam("fetcherPluginId", httpHeaders,
params);
+ String fetcherId = getParam("fetcherId", httpHeaders, params);
String fetchKey = getParam("fetchKey", httpHeaders, params);
fetchKey = urlDecode(fetchKey);
if (StringUtils.isBlank(fetchKey)) {
@@ -91,8 +91,8 @@ public class FetcherStreamFactory implements
InputStreamFactory {
TikaResource.fillParseContext(httpHeaders.getRequestHeaders(),
metadata, parseContext);
long fetchRangeStart = getLong(getParam("fetchRangeStart",
httpHeaders, params));
long fetchRangeEnd = getLong(getParam("fetchRangeEnd", httpHeaders,
params));
- if (StringUtils.isBlank(fetcherPluginId) !=
StringUtils.isBlank(fetchKey)) {
- throw new IOException("Must specify both a 'fetcherPluginId' and a
'fetchKey'. I see: " + " fetcherPluginId:" + fetcherPluginId + " and fetchKey:"
+ fetchKey);
+ if (StringUtils.isBlank(fetcherId) != StringUtils.isBlank(fetchKey)) {
+ throw new IOException("Must specify both a 'fetcherId' and a
'fetchKey'. I see: " + " fetcherId:" + fetcherId + " and fetchKey:" + fetchKey);
}
if (fetchRangeStart < 0 && fetchRangeEnd > -1) {
throw new IllegalArgumentException("fetchRangeStart must be > -1
if a fetchRangeEnd " + "is specified");
@@ -102,13 +102,14 @@ public class FetcherStreamFactory implements
InputStreamFactory {
throw new IllegalArgumentException("fetchRangeEnd must be > -1 if
a fetchRangeStart " + "is specified");
}
- if (!StringUtils.isBlank(fetcherPluginId)) {
+ if (!StringUtils.isBlank(fetcherId)) {
try {
- LOG.debug("going to fetch '{}' from fetcher: {}", fetchKey,
fetcherPluginId);
- Fetcher fetcher = fetcherManager.getFetcher(fetcherPluginId);
+ LOG.debug("going to fetch '{}' from fetcher: {}", fetchKey,
fetcherId);
+ Fetcher fetcher = fetcherManager.getFetcher(fetcherId);
if (fetchRangeStart > -1 && fetchRangeEnd > -1 && !(fetcher
instanceof RangeFetcher)) {
throw new IllegalArgumentException(
- "Can't call a fetch with a range on a fetcher
that" + " is not a RangeFetcher: name=" + fetcher.getPluginId() + " class=" +
fetcher.getClass());
+ "Can't call a fetch with a range on a fetcher
that" + " is not a RangeFetcher: id="
+ + fetcher.getPluginConfig().id() + "
class=" + fetcher.getClass());
}
return fetcher.fetch(fetchKey, metadata, parseContext);
} catch (TikaException e) {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index d3d24a730..3f428c700 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -44,7 +44,7 @@ public class TikaServerCli {
"listen port (default = 9998)\n");
options.addOption("?", "help", false, "this help message");
options.addOption("c", "config", true, "tika-config file");
- options.addOption("a", "pipesConfig", true, "tike pipes config");
+ options.addOption("a", "pluginsConfig", true, "tike pipes config");
options.addOption("i", "id", true, "id to use for server in" + " the
server status endpoint and logging");
return options;
@@ -60,7 +60,6 @@ public class TikaServerCli {
if (line.hasOption("help")) {
usage(options);
}
- TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
TikaServerProcess.main(args);
} catch (Exception e) {
LOG.error("Can't start: ", e);
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index 878482618..c1344a971 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -24,23 +24,22 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.cli.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
import org.apache.tika.config.ConfigBase;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.utils.XMLReaderUtils;
+import org.apache.tika.plugins.TikaPluginsManager;
public class TikaServerConfig extends ConfigBase {
@@ -112,7 +111,7 @@ private long forkedProcessShutdownMillis =
DEFAULT_FORKED_PROCESS_SHUTDOWN_MILLI
//debug or info only
private String logLevel = "";
private Path configPath;
- private Path pipesConfigPath;
+ private Path pluginsConfigPath;
private List<String> endpoints = new ArrayList<>();
private boolean preventStopMethod = false;
@@ -130,12 +129,12 @@ private long forkedProcessShutdownMillis =
DEFAULT_FORKED_PROCESS_SHUTDOWN_MILLI
TikaServerConfig config = null;
Set<String> settings = new HashSet<>();
- Path pipesConfig = null;
+ Path pluginsConfig = null;
if (commandLine.hasOption('a')) {
- pipesConfig = Paths.get(commandLine.getOptionValue('a'));
+ pluginsConfig = Paths.get(commandLine.getOptionValue('a'));
}
if (commandLine.hasOption("c")) {
- config = load(Paths.get(commandLine.getOptionValue("c")),
pipesConfig, commandLine, settings);
+ config = load(Paths.get(commandLine.getOptionValue("c")),
pluginsConfig, commandLine, settings);
} else {
config = new TikaServerConfig();
}
@@ -160,86 +159,45 @@ private long forkedProcessShutdownMillis =
DEFAULT_FORKED_PROCESS_SHUTDOWN_MILLI
return config;
}
- static TikaServerConfig load(Path p, Path pipesConfigPath, CommandLine
commandLine, Set<String> settings) throws IOException, TikaException {
+ static TikaServerConfig load(Path p, Path pluginsConfigPath, CommandLine
commandLine, Set<String> settings) throws IOException, TikaException {
try (InputStream is = Files.newInputStream(p)) {
- TikaServerConfig config = TikaServerConfig.load(is,
pipesConfigPath, commandLine, settings);
+ TikaServerConfig config = TikaServerConfig.load(is,
pluginsConfigPath, commandLine, settings);
if (config.getConfigPath() == null) {
config.setConfigPath(p
.toAbsolutePath()
.toString());
}
- config.setPipesConfigPath(pipesConfigPath);
- loadSupportedFetchersEmitters(config);
+ config.setPipesConfigPath(pluginsConfigPath);
+ loadSupportedFetchersEmitters(config, pluginsConfigPath);
return config;
}
}
- private static TikaServerConfig load(InputStream is, Path pipesConfigPath,
CommandLine commandLine, Set<String> settings) throws IOException,
TikaException {
+ private static TikaServerConfig load(InputStream is, Path
pluginsConfigPath, CommandLine commandLine, Set<String> settings) throws
IOException, TikaException {
TikaServerConfig tikaServerConfig = new TikaServerConfig();
Set<String> configSettings = tikaServerConfig.configure("server", is);
settings.addAll(configSettings);
- tikaServerConfig.setPipesConfigPath(pipesConfigPath);
+ tikaServerConfig.setPipesConfigPath(pluginsConfigPath);
return tikaServerConfig;
}
- private static void loadSupportedFetchersEmitters(TikaServerConfig
tikaServerConfig) throws IOException, TikaConfigException {
- //this is an abomination... clean up this double read
- try (InputStream is =
Files.newInputStream(tikaServerConfig.getConfigPath())) {
- Node properties = null;
- try {
- properties = XMLReaderUtils
- .buildDOM(is)
- .getDocumentElement();
- } catch (SAXException e) {
- throw new IOException(e);
- } catch (TikaException e) {
- throw new TikaConfigException("problem loading xml to dom", e);
- }
- if (!properties
- .getLocalName()
- .equals("properties")) {
- throw new TikaConfigException("expect properties as root
node");
- }
- NodeList children = properties.getChildNodes();
- for (int i = 0; i < children.getLength(); i++) {
- Node child = children.item(i);
- if ("emitters".equals(child.getLocalName())) {
- loadSupported(child, "emitter",
tikaServerConfig.supportedEmitters);
- }
- }
+ private static void loadSupportedFetchersEmitters(TikaServerConfig
tikaServerConfig, Path pluginsConfigPath) throws IOException,
TikaConfigException {
+ if (pluginsConfigPath == null) {
+ return;
}
- }
-
- private static void loadSupported(Node compound, String itemName,
Set<String> supported) {
- NodeList children = compound.getChildNodes();
- for (int i = 0; i < children.getLength(); i++) {
- Node child = children.item(i);
- if (itemName.equals(child.getLocalName())) {
- String name = getName(child);
- if (name != null) {
- supported.add(name);
- }
- }
+ JsonNode root = TikaPluginsManager.loadRoot(pluginsConfigPath);
+ JsonNode plugins = root.get("plugins");
+ JsonNode fetchers = plugins.get("fetchers");
+ Iterator<String> fieldNames = fetchers.fieldNames();
+ while (fieldNames.hasNext()) {
+ tikaServerConfig.supportedFetchers.add(fieldNames.next());
}
- }
- private static String getName(Node fetcherOrEmitter) {
- NodeList children = fetcherOrEmitter.getChildNodes();
- for (int i = 0; i < children.getLength(); i++) {
- Node child = children.item(i);
- if ("params".equals(child.getLocalName())) {
- NodeList params = child.getChildNodes();
- for (int j = 0; j < params.getLength(); j++) {
- Node param = params.item(j);
- if ("name".equals(param.getLocalName())) {
- return param.getTextContent();
- }
- }
- } else if ("name".equals(child.getLocalName())) {
- return child.getTextContent();
- }
+ JsonNode emitters = plugins.get("emitters");
+ fieldNames = emitters.fieldNames();
+ while (fieldNames.hasNext()) {
+ tikaServerConfig.supportedEmitters.add(fieldNames.next());
}
- return null;
}
public int getPort() {
@@ -354,11 +312,11 @@ private long forkedProcessShutdownMillis =
DEFAULT_FORKED_PROCESS_SHUTDOWN_MILLI
}
public void setPipesConfigPath(Path path) {
- this.pipesConfigPath = path;
+ this.pluginsConfigPath = path;
}
public Optional<Path> getPipesConfigPath() {
- return Optional.ofNullable(pipesConfigPath);
+ return Optional.ofNullable(pluginsConfigPath);
}
public int getDigestMarkLimit() {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index aa12a1cba..eda994655 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -102,7 +102,7 @@ public class TikaServerProcess {
options.addOption("h", "host", true, "host name, use * for all)");
options.addOption("p", "port", true, "listen port");
options.addOption("c", "config", true, "Tika Configuration xml file to
override default config with.");
- options.addOption("a", "pipesConfig", true, "Tika Configuration json
for pipes components");
+ options.addOption("a", "pluginsConfig", true, "Tika Configuration json
for pluginscomponents");
options.addOption("i", "id", true, "id to use for server in server
status endpoint");
options.addOption("?", "help", false, "this help message");
options.addOption("noFork", "noFork", false, "if launched in no fork
mode");
@@ -192,7 +192,8 @@ public class TikaServerProcess {
//REALLY NEED TODO THIS
FetcherManager fetcherManager = null;
InputStreamFactory inputStreamFactory = null;
- if (tikaServerConfig.isEnableUnsecureFeatures()) {
+ if (tikaServerConfig.isEnableUnsecureFeatures() &&
+ tikaServerConfig.getPipesConfigPath().isPresent()) {
fetcherManager =
FetcherManager.load(tikaServerConfig.getPipesConfigPath().get());
inputStreamFactory = new FetcherStreamFactory(fetcherManager);
} else {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 1ce5ffe05..1213d0e49 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -62,10 +62,10 @@ public class AsyncResource {
long maxQueuePauseMs = 60000;
private ArrayBlockingQueue<FetchEmitTuple> queue;
- public AsyncResource(java.nio.file.Path tikaConfigPath, java.nio.file.Path
asyncConfig, Set<String> supportedFetchers) throws TikaException, IOException,
SAXException {
- this.asyncProcessor = new AsyncProcessor(tikaConfigPath, asyncConfig);
+ public AsyncResource(java.nio.file.Path tikaConfigPath, java.nio.file.Path
pluginsConfig, Set<String> supportedFetchers) throws TikaException,
IOException, SAXException {
+ this.asyncProcessor = new AsyncProcessor(tikaConfigPath,
pluginsConfig);
this.supportedFetchers = supportedFetchers;
- this.emitterManager = EmitterManager.load(tikaConfigPath);
+ this.emitterManager = EmitterManager.load(pluginsConfig);
}
public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize)
{
@@ -105,17 +105,17 @@ public class AsyncResource {
for (FetchEmitTuple t : request.getTuples()) {
if (!supportedFetchers.contains(t
.getFetchKey()
- .getFetcherPluginId())) {
+ .getFetcherId())) {
return badFetcher(t.getFetchKey());
}
if (!emitterManager
.getSupported()
.contains(t
.getEmitKey()
- .getEmitterPluginId())) {
+ .getEmitterId())) {
return badEmitter(t
.getEmitKey()
- .getEmitterPluginId());
+ .getEmitterId());
}
ParseContext parseContext = t.getParseContext();
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
parseContext.get(EmbeddedDocumentBytesConfig.class);
@@ -177,7 +177,7 @@ public class AsyncResource {
}
private Map<String, Object> badFetcher(FetchKey fetchKey) {
- throw new BadRequestException("can't find fetcher for " +
fetchKey.getFetcherPluginId());
+ throw new BadRequestException("can't find fetcher for " +
fetchKey.getFetcherId());
}
private AsyncRequest deserializeASyncRequest(InputStream is) throws
IOException {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index abbd4d8b4..8ebf5e192 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -120,7 +120,7 @@ public class PipesResource {
case NO_EMITTER_FOUND: {
throw new IllegalArgumentException("Couldn't find emitter that
matched: " + fetchEmitTuple
.getEmitKey()
- .getEmitterPluginId());
+ .getEmitterId());
}
default:
throw new IllegalArgumentException("I'm sorry, I don't yet
handle a status of " + "this type: " + pipesResult.getStatus());
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/CXFTestBase.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/CXFTestBase.java
index 705563c48..f9024bca2 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/CXFTestBase.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/CXFTestBase.java
@@ -27,8 +27,10 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.Enumeration;
@@ -51,6 +53,8 @@ import org.apache.cxf.transport.common.gzip.GZIPInInterceptor;
import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.parser.digestutils.CommonsDigester;
@@ -59,20 +63,41 @@ import
org.apache.tika.server.core.resource.UnpackerResource;
public abstract class CXFTestBase {
- final static String FETCHER_PLUGIN_ID = "file-system-fetcher";
+ private static final Logger LOG =
LoggerFactory.getLogger(CXFTestBase.class);
- //TODO -- add back in: ,
- // "pf4j.pluginsDir": "PLUGINS_DIR"
- final static String JSON_TEMPLATE = """
+ public final static String FETCHER_ID = "fsf";
+ public final static String EMITTER_JSON_ID = "fse-json";
+ public final static String EMITTER_BYTES_ID = "fse-bytes";
+
+ public final static String JSON_TEMPLATE = """
{
- "pipesPluginsConfig" : {
+ "plugins" : {
"fetchers": {
- "file-system-fetcher": {
- "basePath": "BASE_PATH",
- "extractFileSystemMetadata": false
+ "fsf":{
+ "file-system-fetcher": {
+ "basePath": "FETCHER_BASE_PATH",
+ "extractFileSystemMetadata": false
+ }
+ }
+ },
+ "emitters": {
+ "fse-json": {
+ "file-system-emitter": {
+ "basePath": "JSON_EMITTER_BASE_PATH",
+ "fileExtension": "json",
+ "onExists":"EXCEPTION"
+ }
+ },
+ "fse-bytes": {
+ "file-system-emitter": {
+ "basePath": "BYTES_EMITTER_BASE_PATH",
+ "fileExtension": "",
+ "onExists":"EXCEPTION"
+ }
}
}
- }
+ },
+ "pluginsPaths": "PLUGINS_PATHS"
}
""";
@@ -81,6 +106,22 @@ public abstract class CXFTestBase {
protected Server server;
protected TikaConfig tika;
+ public static void createPluginsConfig(Path configPath, Path inputDir,
Path jsonOutputDir, Path bytesOutputDir) throws IOException {
+
+ Path pluginsDir = Paths.get("target/plugins");
+ if (! Files.isDirectory(pluginsDir)) {
+ LOG.warn("CAN'T FIND PLUGINS DIR. pwd={}",
Paths.get("").toAbsolutePath().toString());
+ }
+ String json = CXFTestBase.JSON_TEMPLATE.replace("FETCHER_BASE_PATH",
inputDir.toAbsolutePath().toString())
+ .replace("JSON_EMITTER_BASE_PATH",
jsonOutputDir.toAbsolutePath().toString())
+ .replace("PLUGINS_PATHS",
pluginsDir.toAbsolutePath().toString());
+ if (bytesOutputDir != null) {
+ json = json.replace("BYTES_EMITTER_BASE_PATH",
bytesOutputDir.toAbsolutePath().toString());
+ }
+
+ Files.writeString(configPath, json, StandardCharsets.UTF_8);
+ }
+
public static void assertContains(String needle, String haystack) {
assertTrue(haystack.contains(needle), needle + " not found in:\n" +
haystack);
}
@@ -182,11 +223,29 @@ public abstract class CXFTestBase {
}
protected InputStream getPipesConfigInputStream() throws IOException {
- if (getPipesInputPath() != null) {
- String json = JSON_TEMPLATE.replace("BASE_PATH",
getPipesInputPath());
- return
UnsynchronizedByteArrayInputStream.builder().setByteArray(json.getBytes(UTF_8)).get();
+ if (getPipesInputPath() == null) {
+ return null;
}
- return null;
+
+ Path pluginsDir = Paths.get("target/plugins");
+ if (!Files.isDirectory(pluginsDir)) {
+ LOG.warn("CAN'T FIND PLUGINS DIR. pwd={}", Paths
+ .get("")
+ .toAbsolutePath()
+ .toString());
+ }
+ String json = CXFTestBase.JSON_TEMPLATE
+ .replace("FETCHER_BASE_PATH", getPipesInputPath())
+ .replace("PLUGINS_PATHS", pluginsDir
+ .toAbsolutePath()
+ .toString());
+
+
+ return UnsynchronizedByteArrayInputStream
+ .builder()
+ .setByteArray(json.getBytes(UTF_8))
+ .get();
+
}
protected String getPipesInputPath() {
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index 7170a8ed3..0ac2a5ffc 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -39,7 +39,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
@@ -106,10 +105,7 @@ public class TikaPipesTest extends CXFTestBase {
TIKA_CONFIG_PATH = Files.createTempFile(TMP_DIR, "tika-pipes-",
".xml");
TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<properties>" +
- "<emitters>" +
- "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
"<name>fse</name>"
- + "<basePath>" + TMP_OUTPUT_DIR.toAbsolutePath() +
"</basePath>" +
- "</emitter>" + "</emitters>" + "<pipes><tikaConfig>" +
ProcessUtils.escapeCommandLine(TIKA_CONFIG_PATH
+ "<pipes><tikaConfig>" +
ProcessUtils.escapeCommandLine(TIKA_CONFIG_PATH
.toAbsolutePath()
.toString()) + "</tikaConfig><numClients>10</numClients>" +
"<forkedJvmArgs>" + "<arg>-Xmx256m</arg>" +
"<arg>-Dlog4j.configurationFile=file:" +
ProcessUtils.escapeCommandLine(TIKA_PIPES_LOG4j2_PATH
@@ -118,10 +114,12 @@ public class TikaPipesTest extends CXFTestBase {
Files.write(TIKA_CONFIG_PATH,
TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
TIKA_PIPES_CONFIG_PATH = Files.createTempFile(TMP_DIR,
"tika-pipes-config-", ".json");
- String json = JSON_TEMPLATE.replace("BASE_PATH",
inputDir.toAbsolutePath().toString());
- Files.writeString(TIKA_PIPES_CONFIG_PATH, json,
StandardCharsets.UTF_8);
- FETCHER_MANAGER = FetcherManager.load(
-
UnsynchronizedByteArrayInputStream.builder().setByteArray(json.getBytes(StandardCharsets.UTF_8)).get());
+
+ CXFTestBase.createPluginsConfig(TIKA_PIPES_CONFIG_PATH, inputDir,
TMP_OUTPUT_DIR, null);
+
+ try (InputStream is = Files.newInputStream(TIKA_PIPES_CONFIG_PATH)) {
+ FETCHER_MANAGER = FetcherManager.load(is);
+ }
}
@AfterAll
@@ -180,7 +178,7 @@ public class TikaPipesTest extends CXFTestBase {
}
FetchEmitTuple t = new FetchEmitTuple("myId",
- new FetchKey(FETCHER_PLUGIN_ID, "hello_world.xml"), new
EmitKey("fse", ""), userMetadata);
+ new FetchKey(FETCHER_ID, "hello_world.xml"), new
EmitKey(EMITTER_JSON_ID, ""), userMetadata);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -219,8 +217,8 @@ public class TikaPipesTest extends CXFTestBase {
HandlerConfig handlerConfig = new
HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
HandlerConfig.PARSE_MODE.RMETA, -1, -1, true);
parseContext.set(HandlerConfig.class, handlerConfig);
FetchEmitTuple t =
- new FetchEmitTuple("myId", new FetchKey(FETCHER_PLUGIN_ID,
"hello_world.xml"),
- new EmitKey("fse", ""), userMetadata, parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+ new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
"hello_world.xml"),
+ new EmitKey(EMITTER_JSON_ID, ""), userMetadata,
parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -250,8 +248,8 @@ public class TikaPipesTest extends CXFTestBase {
userMetadata.add("my-key-multi", s);
}
- FetchEmitTuple t = new FetchEmitTuple("myId", new
FetchKey(FETCHER_PLUGIN_ID,
- "null_pointer.xml"), new EmitKey("fse", ""), userMetadata);
+ FetchEmitTuple t = new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
+ "null_pointer.xml"), new EmitKey(EMITTER_JSON_ID, ""),
userMetadata);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -288,8 +286,8 @@ public class TikaPipesTest extends CXFTestBase {
@Test
public void testPostNPENoEmit() throws Exception {
- FetchEmitTuple t = new FetchEmitTuple("myId", new
FetchKey(FETCHER_PLUGIN_ID,
- "null_pointer.xml"), new EmitKey("fse", ""), new Metadata(),
new ParseContext(),
+ FetchEmitTuple t = new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
+ "null_pointer.xml"), new EmitKey(EMITTER_JSON_ID, ""), new
Metadata(), new ParseContext(),
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaResourceFetcherTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaResourceFetcherTest.java
index 399cbd1e9..0c642076e 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaResourceFetcherTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaResourceFetcherTest.java
@@ -43,6 +43,7 @@ import
org.apache.tika.server.core.writer.JSONMessageBodyWriter;
public class TikaResourceFetcherTest extends CXFTestBase {
+
private static final String TIKA_PATH = "/tika";
@Override
@@ -100,7 +101,7 @@ public class TikaResourceFetcherTest extends CXFTestBase {
@Test
public void testHeader() throws Exception {
MultivaluedMap<String, String> map = new MultivaluedHashMap<>();
- map.putSingle("fetcherPluginId", FETCHER_PLUGIN_ID);
+ map.putSingle("fetcherId", FETCHER_ID);
map.putSingle("fetchKey", "mock/hello_world.xml");
Response response = WebClient
.create(endPoint + TIKA_PATH)
@@ -115,7 +116,7 @@ public class TikaResourceFetcherTest extends CXFTestBase {
public void testQueryPart() throws Exception {
Response response = WebClient
.create(endPoint + TIKA_PATH)
- .query("fetcherPluginId", FETCHER_PLUGIN_ID)
+ .query("fetcherId", FETCHER_ID)
.query("fetchKey", "mock/hello_world.xml")
.accept("text/xml")
.put(null);
@@ -128,7 +129,7 @@ public class TikaResourceFetcherTest extends CXFTestBase {
public void testNonAsciiInQueryParameters() throws Exception {
Response response = WebClient
.create(endPoint + TIKA_PATH)
- .query("fetcherPluginId", FETCHER_PLUGIN_ID)
+ .query("fetcherId", FETCHER_ID)
.query("fetchKey", "mock/中文.xml")
.accept("text/xml")
.put(null);
@@ -141,7 +142,7 @@ public class TikaResourceFetcherTest extends CXFTestBase {
public void testNonAsciiUrlEncodedInQueryParameters() throws Exception {
Response response = WebClient
.create(endPoint + TIKA_PATH)
- .query("fetcherPlugId", FETCHER_PLUGIN_ID)
+ .query("fetcherPlugId", FETCHER_ID)
.query("fetchKey", "mock/%E4%B8%AD%E6%96%87.xml")
.accept("text/xml")
.put(null);
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 586b33c96..2984ff9f4 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -18,6 +18,8 @@ package org.apache.tika.server.core;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tika.server.core.CXFTestBase.EMITTER_JSON_ID;
+import static org.apache.tika.server.core.CXFTestBase.FETCHER_ID;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;
@@ -58,8 +60,6 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
private static final int NUM_FILES = 100;
- private static final String EMITTER_NAME = "fse";
- private static final String FETCHER_NAME = "fsf";
private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
@TempDir
@@ -67,6 +67,7 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
private static Path TMP_OUTPUT_DIR;
private static String TIKA_CONFIG_XML;
private static Path TIKA_CONFIG;
+ private static Path PLUGINS_CONFIG;
private static List<String> FILE_LIST = new ArrayList<>();
private static String[] FILES = new String[]{"hello_world.xml",
"null_pointer.xml",
// "heavy_hang_30000.xml", "real_oom.xml",
@@ -96,16 +97,16 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
TIKA_CONFIG_XML =
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>"
+ "<fetchers>" + "<fetcher
class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" + "<name>" +
- FETCHER_NAME + "</name>" + "<basePath>" +
inputDir.toAbsolutePath() + "</basePath>" + "</fetcher>" + "</fetchers>" +
"<emitters>" +
- "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" + "<name>" +
EMITTER_NAME + "</name>" + "<basePath>" +
- TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
"</emitter>" + "</emitters>" +
"<server><endpoints><endpoint>async</endpoint></endpoints>" +
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>"
+ "<server><endpoints><endpoint>async</endpoint></endpoints>" +
"<enableUnsecureFeatures>true</enableUnsecureFeatures></server>" +
"<async><tikaConfig>" + ProcessUtils.escapeCommandLine(TIKA_CONFIG
.toAbsolutePath()
.toString()) +
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
"</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
"</async>" + "</properties>";
FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
+
+ PLUGINS_CONFIG = TMP_DIR.resolve("plugins-config.json");
+ CXFTestBase.createPluginsConfig(PLUGINS_CONFIG, inputDir,
TMP_OUTPUT_DIR, null);
}
@@ -128,16 +129,19 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
Thread serverThread = new Thread(() -> TikaServerCli.main(new String[]{
//for debugging/development, use no fork; otherwise go with
the default
//"-noFork",
- "-p", INTEGRATION_TEST_PORT, "-config",
TIKA_CONFIG.toAbsolutePath().toString()}));
+ "-p", INTEGRATION_TEST_PORT, "-config",
TIKA_CONFIG.toAbsolutePath().toString(),
+ "-a", PLUGINS_CONFIG.toAbsolutePath().toString() }));
serverThread.start();
try {
long start = System.currentTimeMillis();
JsonNode response = sendAsync(FILE_LIST);
+
String status = response
.get("status")
.asText();
+
if (!"ok".equals(status)) {
fail("bad status: '" + status + "' -> " +
response.toPrettyString());
}
@@ -171,6 +175,9 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
.create(endPoint + "/async")
.accept("application/json")
.post(json);
+ if (response.getStatus() != 200) {
+ throw new IllegalArgumentException("status must be 200, but I got:
" + response.getStatus());
+ }
Reader reader = new InputStreamReader((InputStream)
response.getEntity(), UTF_8);
return new ObjectMapper().readTree(reader);
}
@@ -179,6 +186,6 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
ParseContext parseContext = new ParseContext();
parseContext.set(HandlerConfig.class,
HandlerConfig.DEFAULT_HANDLER_CONFIG);
- return new FetchEmitTuple(fileName, new FetchKey(FETCHER_NAME,
fileName), new EmitKey(EMITTER_NAME, ""), new Metadata(), parseContext,
ON_PARSE_EXCEPTION);
+ return new FetchEmitTuple(fileName, new FetchKey(FETCHER_ID,
fileName), new EmitKey(EMITTER_JSON_ID, ""), new Metadata(), parseContext,
ON_PARSE_EXCEPTION);
}
}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
index 4e8823389..c277aafe5 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
@@ -45,10 +45,10 @@ public class TikaServerConfigTest {
Path path = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-config-server.xml")
.toURI());
- Path pipesConfig = Paths.get(TikaConfigTest.class
+ Path pluginsConfig = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-pipes-config.json")
.toURI());
- TikaServerConfig config = TikaServerConfig.load(path, pipesConfig,
emptyCommandLine, settings);
+ TikaServerConfig config = TikaServerConfig.load(path, pluginsConfig,
emptyCommandLine, settings);
assertEquals(54321, config.getTaskTimeoutMillis());
assertEquals(true, config.isEnableUnsecureFeatures());
@@ -64,10 +64,10 @@ public class TikaServerConfigTest {
Path path = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-config-server-fetchers-emitters.xml")
.toURI());
- Path pipesConfig = Paths.get(TikaConfigTest.class
+ Path pluginsConfig = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-pipes-config.json")
.toURI());
- TikaServerConfig config = TikaServerConfig.load(path, pipesConfig,
emptyCommandLine, settings);
+ TikaServerConfig config = TikaServerConfig.load(path, pluginsConfig,
emptyCommandLine, settings);
assertEquals(54321, config.getTaskTimeoutMillis());
assertEquals(true, config.isEnableUnsecureFeatures());
@@ -76,7 +76,7 @@ public class TikaServerConfigTest {
.size());
assertTrue(config
.getSupportedEmitters()
- .contains("fse"));
+ .contains(CXFTestBase.EMITTER_JSON_ID));
}
@Test
@@ -109,11 +109,11 @@ public class TikaServerConfigTest {
Path path = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-config-server-tls.xml")
.toURI());
- Path pipesConfig = Paths.get(TikaConfigTest.class
+ Path pluginsConfig = Paths.get(TikaConfigTest.class
.getResource("/configs/tika-pipes-config.json")
.toURI());
- TikaServerConfig config = TikaServerConfig.load(path, pipesConfig,
emptyCommandLine, settings);
+ TikaServerConfig config = TikaServerConfig.load(path, pluginsConfig,
emptyCommandLine, settings);
TlsConfig tlsConfig = config.getTlsConfig();
assertTrue(tlsConfig.isActive());
assertFalse(tlsConfig.isClientAuthenticationWanted());
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
index e0d333bde..c3415ef41 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
@@ -39,8 +39,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
@@ -53,9 +51,6 @@ import org.apache.tika.utils.ProcessUtils;
public class TikaServerPipesIntegrationTest extends IntegrationTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TikaServerPipesIntegrationTest.class);
- private static final String EMITTER_NAME = "fse";
-
private static Path TEMP_OUTPUT_DIR;
private static Path TIKA_CONFIG;
private static Path TIKA_CONFIG_TIMEOUT;
@@ -77,10 +72,7 @@ public class TikaServerPipesIntegrationTest extends
IntegrationTestBase {
TIKA_PIPES_CONFIG = TEMP_OUTPUT_DIR.resolve("tika-pipes.json");
//TODO -- clean this up so that port is sufficient and we don't need
portString
String xml1 =
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>"
+ "<emitters>" +
- "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" + "<name>" +
EMITTER_NAME +
- "</name>" + "<basePath>" +
- TEMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
"</emitter>" + "</emitters>" + "<server>" +
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>"
+ "<server>" +
"<enableUnsecureFeatures>true</enableUnsecureFeatures>" +
"<port>9999</port>" + "<endpoints>" +
"<endpoint>pipes</endpoint>" + "<endpoint>status</endpoint>" + "</endpoints>";
String xml2 = "</server>" + "<pipes><tikaConfig>" +
ProcessUtils.escapeCommandLine(TIKA_CONFIG
@@ -94,10 +86,8 @@ public class TikaServerPipesIntegrationTest extends
IntegrationTestBase {
String tikaConfigTimeoutXML = xml1 +
"<taskPulseMillis>100</taskPulseMillis>" +
"<taskTimeoutMillis>10000</taskTimeoutMillis>" + xml2;
FileUtils.write(TIKA_CONFIG_TIMEOUT.toFile(), tikaConfigTimeoutXML,
UTF_8);
+ CXFTestBase.createPluginsConfig(TIKA_PIPES_CONFIG, inputDir,
TEMP_OUTPUT_DIR, null);
-
- String json = CXFTestBase.JSON_TEMPLATE.replace("BASE_PATH",
inputDir.toAbsolutePath().toString());
- Files.writeString(TIKA_PIPES_CONFIG, json, UTF_8);
}
@AfterEach
@@ -249,8 +239,8 @@ public class TikaServerPipesIntegrationTest extends
IntegrationTestBase {
private String getJsonString(String fileName,
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws IOException {
ParseContext parseContext = new ParseContext();
parseContext.set(HandlerConfig.class,
HandlerConfig.DEFAULT_HANDLER_CONFIG);
- FetchEmitTuple t = new FetchEmitTuple(fileName, new
FetchKey(CXFTestBase.FETCHER_PLUGIN_ID, fileName),
- new EmitKey(EMITTER_NAME, ""), new Metadata(), parseContext,
onParseException);
+ FetchEmitTuple t = new FetchEmitTuple(fileName, new
FetchKey(CXFTestBase.FETCHER_ID, fileName),
+ new EmitKey(CXFTestBase.EMITTER_JSON_ID, ""), new Metadata(),
parseContext, onParseException);
return JsonFetchEmitTuple.toJson(t);
}
}
diff --git
a/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
b/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
index 7c543f991..54ac567aa 100644
---
a/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
+++
b/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
@@ -16,12 +16,6 @@
limitations under the License.
-->
<properties>
- <emitters>
- <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
- <name>fse</name>
- <basePath>/path/or/other/extracts</basePath>
- </emitter>
- </emitters>
<server>
<port>9999</port>
<taskTimeoutMillis>54321</taskTimeoutMillis>
diff --git
a/tika-server/tika-server-core/src/test/resources/configs/tika-pipes-config.json
b/tika-server/tika-server-core/src/test/resources/configs/tika-pipes-config.json
index 5a46c0444..f82f186c1 100644
---
a/tika-server/tika-server-core/src/test/resources/configs/tika-pipes-config.json
+++
b/tika-server/tika-server-core/src/test/resources/configs/tika-pipes-config.json
@@ -1,9 +1,19 @@
{
- "pipesPluginsConfig" : {
+ "plugins" : {
"fetchers": {
- "file-system-fetcher": {
- "basePath": "BASE_PATH",
- "extractFileSystemMetadata": false
+ "fsf": {
+ "file-system-fetcher": {
+ "basePath": "BASE_PATH",
+ "extractFileSystemMetadata": false
+ }
+ }
+ },
+ "emitters": {
+ "fse-json": {
+ "file-system-emitter": {
+ "basePath": "BASE_PATH",
+ "onExists": "EXCEPTION"
+ }
}
}
}
diff --git a/tika-server/tika-server-standard/pom.xml
b/tika-server/tika-server-standard/pom.xml
index 7a98a8a7e..8100fe89d 100644
--- a/tika-server/tika-server-standard/pom.xml
+++ b/tika-server/tika-server-standard/pom.xml
@@ -141,6 +141,13 @@
<type>jar</type>
<overWrite>true</overWrite>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-emitter-file-system</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git
a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/FetcherTest.java
b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/FetcherTest.java
index 460f9b472..ed1f4e94d 100644
---
a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/FetcherTest.java
+++
b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/FetcherTest.java
@@ -89,7 +89,7 @@ public class FetcherTest extends CXFTestBase {
.create(endPoint + META_PATH)
.accept("application/json")
.acceptEncoding("gzip")
- .header("fetcherPluginId", "url-fetcher")
+ .header("fetcherId", "url-fetcher")
.header("fetchKey", "https://tika.apache.org")
.put("");
diff --git
a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
index ca5b673be..faf00b6f4 100644
---
a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
+++
b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
@@ -39,7 +39,6 @@ import java.util.Map;
import jakarta.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
@@ -48,6 +47,8 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
@@ -77,20 +78,7 @@ import org.apache.tika.utils.ProcessUtils;
*/
public class TikaPipesTest extends CXFTestBase {
- final static String JSON_TEMPLATE = """
- {
- "pipesPluginsConfig" : {
- "fetchers": {
- "file-system-fetcher": {
- "basePath": "BASE_PATH",
- "extractFileSystemMetadata": false
- }
- }
- }
- }
- """;
- final static String FETCHER_PLUGIN_ID = "file-system-fetcher";
-
+ private static final Logger LOG =
LoggerFactory.getLogger(TikaPipesTest.class);
private static final String PIPES_PATH = "/pipes";
private static final String TEST_RECURSIVE_DOC =
"test_recursive_embedded.docx";
@@ -98,22 +86,22 @@ public class TikaPipesTest extends CXFTestBase {
@TempDir
private static Path TMP_WORKING_DIR;
- private static Path TMP_OUTPUT_DIR;
- private static Path TMP_BYTES_DIR;
+ private static Path OUTPUT_JSON_DIR;
+ private static Path OUTPUT_BYTES_DIR;
private static Path TIKA_PIPES_LOG4j2_PATH;
private static Path TIKA_CONFIG_PATH;
- private static Path TIKA_PIPES_CONFIG_PATH;
+ private static Path PLUGINS_CONFIG_PATH;
private static String TIKA_CONFIG_XML;
private static FetcherManager FETCHER_MANAGER;
@BeforeAll
public static void setUpBeforeClass() throws Exception {
Path inputDir = TMP_WORKING_DIR.resolve("input");
- TMP_OUTPUT_DIR = TMP_WORKING_DIR.resolve("output");
- TMP_BYTES_DIR = TMP_WORKING_DIR.resolve("bytes");
+ OUTPUT_JSON_DIR = TMP_WORKING_DIR.resolve("output");
+ OUTPUT_BYTES_DIR = TMP_WORKING_DIR.resolve("bytes");
Files.createDirectories(inputDir);
- Files.createDirectories(TMP_OUTPUT_DIR);
+ Files.createDirectories(OUTPUT_JSON_DIR);
Files.copy(TikaPipesTest.class.getResourceAsStream("/test-documents/"
+ TEST_RECURSIVE_DOC), inputDir.resolve("test_recursive_embedded.docx"),
StandardCopyOption.REPLACE_EXISTING);
Files.copy(TikaPipesTest.class.getResourceAsStream("/test-documents/"
+ TEST_TWO_BOXES_PDF), inputDir.resolve(TEST_TWO_BOXES_PDF),
@@ -123,12 +111,7 @@ public class TikaPipesTest extends CXFTestBase {
Files.copy(TikaPipesTest.class.getResourceAsStream("/log4j2.xml"),
TIKA_PIPES_LOG4j2_PATH, StandardCopyOption.REPLACE_EXISTING);
//TODO: templatify this config
- TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<properties>" + "<emitters>" +
- "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" + "<params>" +
"<name>fse</name>" +
- "<basePath>" + TMP_OUTPUT_DIR.toAbsolutePath() +
- "</basePath>" + "</params>" + "</emitter>" + "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
- "<params>" + "<name>bytes</name>" +
- "<basePath>" + TMP_BYTES_DIR.toAbsolutePath() + "</basePath>"
+ "</params>" + "</emitter>" + "</emitters>" +
+ TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<properties>" +
"<pipes><params><tikaConfig>" +
ProcessUtils.escapeCommandLine(TIKA_CONFIG_PATH
.toAbsolutePath()
@@ -139,27 +122,27 @@ public class TikaPipesTest extends CXFTestBase {
.toString()) + "</arg>" + "</forkedJvmArgs>" +
"</params></pipes>" + "</properties>";
Files.write(TIKA_CONFIG_PATH,
TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
- TIKA_PIPES_CONFIG_PATH = Files.createTempFile(TMP_WORKING_DIR,
"tika-pipes-config-", ".json");
- TIKA_PIPES_CONFIG_PATH = Files.createTempFile(TMP_WORKING_DIR,
"tika-pipes-config-", ".json");
- String json = JSON_TEMPLATE.replace("BASE_PATH",
inputDir.toAbsolutePath().toString());
- Files.writeString(TIKA_PIPES_CONFIG_PATH, json,
StandardCharsets.UTF_8);
- FETCHER_MANAGER =
FetcherManager.load(UnsynchronizedByteArrayInputStream
-
.builder().setByteArray(json.getBytes(StandardCharsets.UTF_8)).get());
+ PLUGINS_CONFIG_PATH = Files.createTempFile(TMP_WORKING_DIR,
"tika-pipes-config-", ".json");
+ CXFTestBase.createPluginsConfig(PLUGINS_CONFIG_PATH, inputDir,
OUTPUT_JSON_DIR, OUTPUT_BYTES_DIR);
+
+ try (InputStream is = Files.newInputStream(PLUGINS_CONFIG_PATH)) {
+ FETCHER_MANAGER = FetcherManager.load(is);
+ }
}
@BeforeEach
public void setUpEachTest() throws Exception {
- FileUtils.deleteDirectory(TMP_OUTPUT_DIR.toFile());
- assertFalse(Files.isDirectory(TMP_OUTPUT_DIR));
+ FileUtils.deleteDirectory(OUTPUT_JSON_DIR.toFile());
+ assertFalse(Files.isDirectory(OUTPUT_JSON_DIR));
}
@Override
protected void setUpResources(JAXRSServerFactoryBean sf) {
List<ResourceProvider> rCoreProviders = new ArrayList<>();
try {
- rCoreProviders.add(new SingletonResourceProvider(new
PipesResource(TIKA_CONFIG_PATH, TIKA_PIPES_CONFIG_PATH)));
+ rCoreProviders.add(new SingletonResourceProvider(new
PipesResource(TIKA_CONFIG_PATH, PLUGINS_CONFIG_PATH)));
} catch (IOException | TikaConfigException e) {
throw new RuntimeException(e);
}
@@ -189,8 +172,8 @@ public class TikaPipesTest extends CXFTestBase {
@Test
public void testBasic() throws Exception {
- FetchEmitTuple t = new FetchEmitTuple("myId", new
FetchKey(FETCHER_PLUGIN_ID, "test_recursive_embedded.docx"),
- new EmitKey("fse", ""));
+ FetchEmitTuple t = new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
"test_recursive_embedded.docx"),
+ new EmitKey(EMITTER_JSON_ID, ""));
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -202,7 +185,7 @@ public class TikaPipesTest extends CXFTestBase {
assertEquals(200, response.getStatus());
List<Metadata> metadataList = null;
- try (Reader reader =
Files.newBufferedReader(TMP_OUTPUT_DIR.resolve(TEST_RECURSIVE_DOC + ".json"))) {
+ try (Reader reader =
Files.newBufferedReader(OUTPUT_JSON_DIR.resolve(TEST_RECURSIVE_DOC + ".json")))
{
metadataList = JsonMetadataList.fromJson(reader);
}
assertEquals(12, metadataList.size());
@@ -217,8 +200,8 @@ public class TikaPipesTest extends CXFTestBase {
HandlerConfig handlerConfig = new
HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT,
HandlerConfig.PARSE_MODE.CONCATENATE, -1, -1, true);
parseContext.set(HandlerConfig.class, handlerConfig);
- FetchEmitTuple t = new FetchEmitTuple("myId", new
FetchKey(FETCHER_PLUGIN_ID, "test_recursive_embedded.docx"),
- new EmitKey("fse", ""), new Metadata(), parseContext,
+ FetchEmitTuple t = new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
"test_recursive_embedded.docx"),
+ new EmitKey(EMITTER_JSON_ID, ""), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -234,7 +217,7 @@ public class TikaPipesTest extends CXFTestBase {
assertEquals(200, response.getStatus());
List<Metadata> metadataList = null;
- try (Reader reader =
Files.newBufferedReader(TMP_OUTPUT_DIR.resolve(TEST_RECURSIVE_DOC + ".json"))) {
+ try (Reader reader =
Files.newBufferedReader(OUTPUT_JSON_DIR.resolve(TEST_RECURSIVE_DOC + ".json")))
{
metadataList = JsonMetadataList.fromJson(reader);
}
assertEquals(1, metadataList.size());
@@ -251,8 +234,8 @@ public class TikaPipesTest extends CXFTestBase {
pdfParserConfig.setSortByPosition(true);
parseContext.set(PDFParserConfig.class, pdfParserConfig);
- FetchEmitTuple t = new FetchEmitTuple("myId", new
FetchKey(FETCHER_PLUGIN_ID, TEST_TWO_BOXES_PDF),
- new EmitKey("fse", ""), metadata, parseContext);
+ FetchEmitTuple t = new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
TEST_TWO_BOXES_PDF),
+ new EmitKey(EMITTER_JSON_ID, ""), metadata, parseContext);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
String getUrl = endPoint + PIPES_PATH;
@@ -263,7 +246,7 @@ public class TikaPipesTest extends CXFTestBase {
assertEquals(200, response.getStatus());
List<Metadata> metadataList = null;
- Path outputFile = TMP_OUTPUT_DIR.resolve(TEST_TWO_BOXES_PDF + ".json");
+ Path outputFile = OUTPUT_JSON_DIR.resolve(TEST_TWO_BOXES_PDF +
".json");
try (Reader reader = Files.newBufferedReader(outputFile)) {
metadataList = JsonMetadataList.fromJson(reader);
}
@@ -278,7 +261,7 @@ public class TikaPipesTest extends CXFTestBase {
@Test
public void testBytes() throws Exception {
EmbeddedDocumentBytesConfig config = new
EmbeddedDocumentBytesConfig(true);
- config.setEmitter("bytes");
+ config.setEmitter(EMITTER_BYTES_ID);
config.setIncludeOriginal(true);
config.setEmbeddedIdPrefix("-");
config.setZeroPadName(10);
@@ -287,8 +270,8 @@ public class TikaPipesTest extends CXFTestBase {
parseContext.set(HandlerConfig.class,
HandlerConfig.DEFAULT_HANDLER_CONFIG);
parseContext.set(EmbeddedDocumentBytesConfig.class, config);
FetchEmitTuple t =
- new FetchEmitTuple("myId", new FetchKey(FETCHER_PLUGIN_ID,
"test_recursive_embedded.docx"),
- new EmitKey("fse", "test_recursive_embedded.docx"),
new Metadata(), parseContext,
+ new FetchEmitTuple("myId", new FetchKey(FETCHER_ID,
"test_recursive_embedded.docx"),
+ new EmitKey(EMITTER_JSON_ID,
"test_recursive_embedded.docx"), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
@@ -303,7 +286,7 @@ public class TikaPipesTest extends CXFTestBase {
assertEquals(200, response.getStatus());
List<Metadata> metadataList = null;
- try (Reader reader =
Files.newBufferedReader(TMP_OUTPUT_DIR.resolve(TEST_RECURSIVE_DOC + ".json"))) {
+ try (Reader reader =
Files.newBufferedReader(OUTPUT_JSON_DIR.resolve(TEST_RECURSIVE_DOC + ".json")))
{
metadataList = JsonMetadataList.fromJson(reader);
}
assertEquals(12, metadataList.size());
@@ -311,7 +294,7 @@ public class TikaPipesTest extends CXFTestBase {
.get(6)
.get(TikaCoreProperties.TIKA_CONTENT));
Map<String, Long> expected = loadExpected();
- Map<String, Long> byteFileNames = getFileNames(TMP_BYTES_DIR);
+ Map<String, Long> byteFileNames = getFileNames(OUTPUT_BYTES_DIR);
assertEquals(expected, byteFileNames);
}
@@ -334,7 +317,7 @@ public class TikaPipesTest extends CXFTestBase {
private Map<String, Long> getFileNames(Path p) throws Exception {
final Map<String, Long> ret = new HashMap<>();
- Files.walkFileTree(TMP_BYTES_DIR, new FileVisitor<Path>() {
+ Files.walkFileTree(OUTPUT_BYTES_DIR, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir,
BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;