This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new b370d4f TIKA-3370 -- add pipes processor and swap out /emit for /pipes
b370d4f is described below
commit b370d4f4f09ff45a6ee4b619223c468236c77761
Author: tallison <[email protected]>
AuthorDate: Mon May 10 17:32:32 2021 -0400
TIKA-3370 -- add pipes processor and swap out /emit for /pipes
---
.../java/org/apache/tika/config/ConfigBase.java | 2 +-
.../java/org/apache/tika/pipes/FetchEmitTuple.java | 13 +-
.../java/org/apache/tika/pipes/PipesClient.java | 292 ++++++++++++++++++++
.../java/org/apache/tika/pipes/PipesConfig.java | 54 ++++
.../org/apache/tika/pipes/PipesConfigBase.java | 95 +++++++
...ncRuntimeException.java => PipesException.java} | 6 +-
.../java/org/apache/tika/pipes/PipesParser.java | 75 +++++
.../java/org/apache/tika/pipes/PipesResult.java | 72 +++++
.../{async/AsyncServer.java => PipesServer.java} | 291 ++++++++++++++------
.../org/apache/tika/pipes/async/AsyncClient.java | 158 -----------
.../apache/tika/pipes/async/AsyncClientConfig.java | 3 -
.../org/apache/tika/pipes/async/AsyncConfig.java | 85 +++++-
.../org/apache/tika/pipes/async/AsyncEmitter.java | 4 +-
.../tika/pipes/async/AsyncEmitterConfig.java | 27 --
.../apache/tika/pipes/async/AsyncProcessor.java | 43 +--
.../org/apache/tika/pipes/async/AsyncResult.java | 53 ----
.../apache/tika/pipes/emitter/AbstractEmitter.java | 15 -
.../org/apache/tika/pipes/emitter/EmitData.java | 16 ++
.../tika/pipes/fetchiterator/FetchIterator.java | 2 +-
.../fetchiterator/FileSystemFetchIterator.java | 2 +-
.../tika/pipes/async/AsyncProcessorTest.java | 11 +-
.../pipes/fetchiterator/csv/CSVFetchIterator.java | 40 ++-
.../fetchiterator/jdbc/JDBCFetchIterator.java | 30 +-
.../fetchiterator/jdbc/TestJDBCFetchIterator.java | 3 +-
.../pipes/fetchiterator/s3/S3FetchIterator.java | 4 +-
.../apache/tika/server/core/TikaServerProcess.java | 124 +++++----
.../tika/server/core/resource/AsyncResource.java | 1 -
.../tika/server/core/resource/EmitterResource.java | 305 ---------------------
.../tika/server/core/resource/PipesResource.java | 155 +++++++++++
.../{TikaEmitterTest.java => TikaPipesTest.java} | 165 +++++------
.../core/TikaServerAsyncIntegrationTest.java | 33 ++-
...st.java => TikaServerPipesIntegrationTest.java} | 37 ++-
32 files changed, 1340 insertions(+), 876 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index c8e560e..f8b6791 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -43,7 +43,7 @@ import org.apache.tika.utils.XMLReaderUtils;
public abstract class ConfigBase {
/**
- * Use this to build a list of componentes for a composite item (e.g.
+ * Use this to build a list of components for a composite item (e.g.
* CompositeMetadataFilter, FetcherManager), each with their own
configurations
*
* @param compositeElementName
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3e4f773..092c323 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -37,19 +37,20 @@ public class FetchEmitTuple implements Serializable {
private final ON_PARSE_EXCEPTION onParseException;
private HandlerConfig handlerConfig;
- public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata
metadata) {
- this(fetchKey, emitKey, metadata, HandlerConfig.DEFAULT_HANDLER_CONFIG,
+
+ public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
+ this(id, fetchKey, emitKey, new Metadata(),
HandlerConfig.DEFAULT_HANDLER_CONFIG,
DEFAULT_ON_PARSE_EXCEPTION);
}
+ public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey,
ON_PARSE_EXCEPTION onParseException) {
+ this(id, fetchKey, emitKey, new Metadata(),
HandlerConfig.DEFAULT_HANDLER_CONFIG,
+ onParseException);
+ }
public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey,
Metadata metadata) {
this(id, fetchKey, emitKey, metadata,
HandlerConfig.DEFAULT_HANDLER_CONFIG,
DEFAULT_ON_PARSE_EXCEPTION);
}
- public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata
metadata,
- HandlerConfig handlerConfig, ON_PARSE_EXCEPTION
onParseException) {
- this(fetchKey.getFetchKey(), fetchKey, emitKey, metadata,
handlerConfig, onParseException);
- }
public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey,
Metadata metadata,
HandlerConfig handlerConfig, ON_PARSE_EXCEPTION
onParseException) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
new file mode 100644
index 0000000..45a3565
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.async.AsyncConfig;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.utils.ProcessUtils;
+
+public class PipesClient implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipesClient.class);
+
+ private Process process;
+ private LogGobbler logGobbler;
+ private Thread logGobblerThread;
+ private final PipesConfigBase pipesConfig;
+ private DataOutputStream output;
+ private DataInputStream input;
+
+ public PipesClient(PipesConfigBase pipesConfig) {
+ this.pipesConfig = pipesConfig;
+ }
+
+ private int filesProcessed = 0;
+
+ public int getFilesProcessed() {
+ return filesProcessed;
+ }
+
+ private boolean ping() {
+ if (process == null || ! process.isAlive()) {
+ return false;
+ }
+ try {
+ output.write(PipesServer.PING);
+ output.flush();
+ int ping = input.read();
+ if (ping == PipesServer.PING) {
+ return true;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (process != null) {
+ process.destroyForcibly();
+ }
+ }
+
+ public PipesResult process(FetchEmitTuple t) throws IOException {
+ if (! ping()) {
+ restart();
+ }
+
+ if (pipesConfig.getMaxFilesProcessed() > 0 &&
+ filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
+ LOG.info("restarting server after hitting max files: " +
filesProcessed);
+ restart();
+ }
+ //TODO consider adding a timer here too
+ // this could block forever if the watchdog thread in the server fails
+ // or is starved
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(t);
+ }
+ byte[] bytes = bos.toByteArray();
+ output.write(PipesServer.CALL);
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ output.flush();
+
+ long start = System.currentTimeMillis();
+ try {
+ return readResults(t);
+ } catch (IOException e) {
+
+ try {
+ process.waitFor(200, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException interruptedException) {
+ //wait just a little bit to let process end to get exit value
+ } finally {
+ process.destroyForcibly();
+ }
+ if (! process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE ==
process.exitValue()) {
+ LOG.warn("{} timed out", t.getId());
+ return PipesResult.TIMEOUT;
+ }
+ return PipesResult.UNSPECIFIED_CRASH;
+ }
+ }
+
+ private PipesResult readResults(FetchEmitTuple t) throws IOException {
+ int status = input.read();
+ switch (status) {
+ case PipesServer.OOM:
+ LOG.warn("oom: " + t.getId());
+ return PipesResult.OOM;
+ case PipesServer.TIMEOUT:
+ LOG.warn("timeout: " + t.getId());
+ return PipesResult.TIMEOUT;
+ case PipesServer.EMIT_EXCEPTION:
+ LOG.warn("emit exception: " + t.getId());
+ return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
+ case PipesServer.NO_EMITTER_FOUND:
+ LOG.warn("no emitter found: " + t.getId());
+ return PipesResult.NO_EMITTER_FOUND;
+ case PipesServer.PARSE_SUCCESS:
+ case PipesServer.PARSE_EXCEPTION_EMIT:
+ LOG.info("parse success: " + t.getId());
+ return deserializeEmitData();
+ case PipesServer.PARSE_EXCEPTION_NO_EMIT:
+ return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
+ case PipesServer.EMIT_SUCCESS:
+ LOG.info("emit success: " + t.getId());
+ return PipesResult.EMIT_SUCCESS;
+ case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
+ return
readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
+ default :
+ throw new IOException("problem reading response from server "
+ status);
+ }
+
+ }
+
+ private PipesResult readMessage(PipesResult.STATUS status) throws
IOException {
+ int length = input.readInt();
+ byte[] bytes = new byte[length];
+ input.readFully(bytes);
+ String msg = new String(bytes, StandardCharsets.UTF_8);
+ return new PipesResult(status, msg);
+ }
+
+ private PipesResult deserializeEmitData() throws IOException {
+ int length = input.readInt();
+ byte[] bytes = new byte[length];
+ input.readFully(bytes);
+ try (ObjectInputStream objectInputStream =
+ new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+ return new PipesResult((EmitData)objectInputStream.readObject());
+ } catch (ClassNotFoundException e) {
+ LOG.error("class not found exception deserializing data", e);
+ //this should be catastrophic
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void restart() throws IOException {
+ if (process != null) {
+ process.destroyForcibly();
+ LOG.info("restarting process");
+ } else {
+ LOG.info("starting process");
+ }
+ if (logGobblerThread != null) {
+ logGobblerThread.interrupt();
+ }
+ ProcessBuilder pb = new ProcessBuilder(getCommandline());
+
+ process = pb.start();
+ logGobbler = new LogGobbler(process.getErrorStream());
+ logGobblerThread = new Thread(logGobbler);
+ logGobblerThread.setDaemon(true);
+ logGobblerThread.start();
+ input = new DataInputStream(process.getInputStream());
+ output = new DataOutputStream(process.getOutputStream());
+ }
+
+ private String[] getCommandline() {
+ List<String> configArgs = pipesConfig.getForkedJvmArgs();
+ boolean hasClassPath = false;
+ boolean hasHeadless = false;
+ boolean hasExitOnOOM = false;
+
+ for (String arg : configArgs) {
+ if (arg.startsWith("-Djava.awt.headless")) {
+ hasHeadless = true;
+ }
+ if (arg.equals("-cp") || arg.equals("--classpath")) {
+ hasClassPath = true;
+ }
+ if (arg.equals("-XX:+ExitOnOutOfMemoryError") ||
+ arg.equals("-XX:+CrashOnOutOfMemoryError")) {
+ hasExitOnOOM = true;
+ }
+ }
+
+ List<String> commandLine = new ArrayList<>();
+ String javaPath = pipesConfig.getJavaPath();
+ commandLine.add(ProcessUtils.escapeCommandLine(javaPath));
+ if (! hasClassPath) {
+ commandLine.add("-cp");
+ commandLine.add(System.getProperty("java.class.path"));
+ }
+ if (! hasHeadless) {
+ commandLine.add("-Djava.awt.headless=true");
+ }
+ if (! hasExitOnOOM) {
+ //warn
+ }
+ for (String arg : configArgs) {
+ commandLine.add(arg);
+ }
+ commandLine.add("org.apache.tika.pipes.PipesServer");
+ commandLine.add(
+
ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
+
+ //turn off emit batching
+ String maxForEmitBatchBytes = "0";
+ if (pipesConfig instanceof AsyncConfig) {
+ maxForEmitBatchBytes =
+
Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
+ }
+ commandLine.add(maxForEmitBatchBytes);
+ commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
+
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
+
+ return commandLine.toArray(new String[0]);
+ }
+
+ public static class LogGobbler implements Runnable {
+ private static final Logger SERVER_LOG =
LoggerFactory.getLogger(LogGobbler.class);
+
+ private final BufferedReader reader;
+ public LogGobbler(InputStream is) {
+ reader = new BufferedReader(new InputStreamReader(is,
StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void run() {
+ String line = null;
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ return;
+ }
+ while (line != null) {
+ if (line.startsWith("debug ")) {
+ SERVER_LOG.debug(line.substring(6));
+ } else if (line.startsWith("info ")) {
+ SERVER_LOG.info(line.substring(5));
+ } else if (line.startsWith("warn ")) {
+ SERVER_LOG.warn(line.substring(5));
+ } else if (line.startsWith("error ")) {
+ SERVER_LOG.error(line.substring(6));
+ }
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ }
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
new file mode 100644
index 0000000..94ae135
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Set;
+
+import org.apache.tika.exception.TikaConfigException;
+
+public class PipesConfig extends PipesConfigBase {
+
+ private long maxWaitForClientMillis = 60000;
+
+ public static PipesConfig load(Path tikaConfig) throws IOException,
TikaConfigException {
+ PipesConfig pipesConfig = new PipesConfig();
+ try (InputStream is = Files.newInputStream(tikaConfig)) {
+ Set<String> settings = pipesConfig.configure("pipes", is);
+ }
+ if (pipesConfig.getTikaConfig() == null) {
+ throw new TikaConfigException("must specify at least a
<tikaConfig> element in the " +
+ "<params> of <pipes>");
+ }
+ return pipesConfig;
+ }
+
+ private PipesConfig() {
+
+ }
+
+ public long getMaxWaitForClientMillis() {
+ return maxWaitForClientMillis;
+ }
+
+ public void setMaxWaitForClientMillis(long maxWaitForClientMillis) {
+ this.maxWaitForClientMillis = maxWaitForClientMillis;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
new file mode 100644
index 0000000..8da4f9d
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.config.ConfigBase;
+
+public class PipesConfigBase extends ConfigBase {
+
+ private long timeoutMillis = 30000;
+ private long shutdownClientAfterMillis = 1200000;
+ private int numClients = 10;
+ private List<String> forkedJvmArgs = new ArrayList<>();
+ private int maxFilesProcessed = 10000;
+ private Path tikaConfig;
+ private String javaPath = "java";
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public void setTimeoutMillis(long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public long getShutdownClientAfterMillis() {
+ return shutdownClientAfterMillis;
+ }
+
+ public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
+ this.shutdownClientAfterMillis = shutdownClientAfterMillis;
+ }
+
+ public int getNumClients() {
+ return numClients;
+ }
+
+ public void setNumClients(int numClients) {
+ this.numClients = numClients;
+ }
+
+ public List<String> getForkedJvmArgs() {
+ return forkedJvmArgs;
+ }
+
+ public void setForkedJvmArgs(List<String> jvmArgs) {
+ this.forkedJvmArgs = jvmArgs;
+ }
+
+ public int getMaxFilesProcessed() {
+ return maxFilesProcessed;
+ }
+
+ public void setMaxFilesProcessed(int maxFilesProcessed) {
+ this.maxFilesProcessed = maxFilesProcessed;
+ }
+
+ public Path getTikaConfig() {
+ return tikaConfig;
+ }
+
+ public void setTikaConfig(Path tikaConfig) {
+ this.tikaConfig = tikaConfig;
+ }
+
+ public void setTikaConfig(String tikaConfig) {
+ setTikaConfig(Paths.get(tikaConfig));
+ }
+
+ public String getJavaPath() {
+ return javaPath;
+ }
+
+ public void setJavaPath(String javaPath) {
+ this.javaPath = javaPath;
+ }
+}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
similarity index 86%
rename from
tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
index 875cc90..ee9545f 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.async;
+package org.apache.tika.pipes;
/**
* Fatal exception that means that something went seriously wrong.
*/
-public class AsyncRuntimeException extends RuntimeException {
+public class PipesException extends Exception {
- public AsyncRuntimeException(Throwable t) {
+ public PipesException(Throwable t) {
super(t);
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
new file mode 100644
index 0000000..59bf633
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class PipesParser implements Closeable {
+
+
+ private final PipesConfig pipesConfig;
+ private final List<PipesClient> clients = new ArrayList<>();
+ private final ArrayBlockingQueue<PipesClient> clientQueue ;
+
+
+ public PipesParser(PipesConfig pipesConfig) {
+ this.pipesConfig = pipesConfig;
+ this.clientQueue = new
ArrayBlockingQueue<>(pipesConfig.getNumClients());
+ for (int i = 0; i < pipesConfig.getNumClients(); i++) {
+ PipesClient client = new PipesClient(pipesConfig);
+ clientQueue.offer(client);
+ clients.add(client);
+ }
+ }
+ public PipesResult parse(FetchEmitTuple t) throws PipesException,
IOException {
+ PipesClient client = null;
+ try {
+ client = clientQueue.poll(pipesConfig.getMaxWaitForClientMillis(),
+ TimeUnit.MILLISECONDS);
+ if (client == null) {
+ return PipesResult.CLIENT_UNAVAILABLE_WITHIN_MS;
+ }
+ return client.process(t);
+ } catch (InterruptedException e) {
+ throw new PipesException(e);
+ } finally {
+ if (client != null) {
+ clientQueue.offer(client);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ List<IOException> exceptions = new ArrayList<>();
+ for (PipesClient pipesClient : clients) {
+ try {
+ pipesClient.close();
+ } catch (IOException e) {
+ exceptions.add(e);
+ }
+ }
+ if (exceptions.size() > 0) {
+ throw exceptions.get(0);
+ }
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
new file mode 100644
index 0000000..0805b03
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import org.apache.tika.pipes.emitter.EmitData;
+
+public class PipesResult {
+
+ public enum STATUS {
+ CLIENT_UNAVAILABLE_WITHIN_MS,
+ PARSE_EXCEPTION_NO_EMIT,
+ PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
+ OOM, TIMEOUT, UNSPECIFIED_CRASH,
+ NO_EMITTER_FOUND,
+ EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION
+ }
+
+ public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
+ new PipesResult(STATUS.CLIENT_UNAVAILABLE_WITHIN_MS);
+ public static PipesResult TIMEOUT = new PipesResult(STATUS.TIMEOUT);
+ public static PipesResult OOM = new PipesResult(STATUS.OOM);
+ public static PipesResult UNSPECIFIED_CRASH = new
PipesResult(STATUS.UNSPECIFIED_CRASH);
+ public static PipesResult EMIT_SUCCESS = new
PipesResult(STATUS.EMIT_SUCCESS);
+ public static PipesResult NO_EMITTER_FOUND = new
PipesResult(STATUS.NO_EMITTER_FOUND);
+ private final STATUS status;
+ private final EmitData emitData;
+ private final String message;
+
+ private PipesResult(STATUS status, EmitData emitData, String message) {
+ this.status = status;
+ this.emitData = emitData;
+ this.message = message;
+ }
+
+ public PipesResult(STATUS status) {
+ this(status, null, null);
+ }
+
+ public PipesResult(STATUS status, String message) {
+ this(status, null, message);
+ }
+
+ public PipesResult(EmitData emitData) {
+ this(STATUS.PARSE_SUCCESS, emitData, null);
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public EmitData getEmitData() {
+ return emitData;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
similarity index 57%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
rename to tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index e6950dc..1749837 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.async;
+package org.apache.tika.pipes;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
@@ -35,61 +36,86 @@ import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;
-public class AsyncServer implements Runnable {
+public class PipesServer implements Runnable {
- public static final byte ERROR = -1;
-
- public static final byte DONE = 0;
+ public static final int TIMEOUT_EXIT_CODE = 3;
public static final byte CALL = 1;
public static final byte PING = 2;
- public static final byte RESOURCE = 3;
+ public static final byte FAILED_TO_START = 3;
+
+ public static final byte PARSE_SUCCESS = 4;
+
+ /**
+ * This will return the parse exception stack trace
+ */
+ public static final byte PARSE_EXCEPTION_NO_EMIT = 5;
+
+ /**
+ * This will return the metadata list
+ */
+ public static final byte PARSE_EXCEPTION_EMIT = 6;
+
+ public static final byte EMIT_SUCCESS = 7;
+
+ public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 8;
+
+ public static final byte EMIT_EXCEPTION = 9;
- public static final byte READY = 4;
+ public static final byte NO_EMITTER_FOUND = 10;
- public static final byte FAILED_TO_START = 5;
+ public static final byte OOM = 11;
- public static final byte OOM = 6;
+ public static final byte TIMEOUT = 12;
+
+ public static final byte EMPTY_OUTPUT = 13;
- public static final byte TIMEOUT = 7;
private final Object[] lock = new Object[0];
private final Path tikaConfigPath;
private final DataInputStream input;
private final DataOutputStream output;
+ private final long maxExtractSizeToReturn;
private final long serverParseTimeoutMillis;
private final long serverWaitTimeoutMillis;
private Parser parser;
private TikaConfig tikaConfig;
private FetcherManager fetcherManager;
+ private EmitterManager emitterManager;
private volatile boolean parsing;
private volatile long since;
- //logging is fussy...the logging frameworks grab stderr/stdout
+ //logging is fussy...the logging frameworks grab stderr and stdout
//before we can redirect. slf4j complains on stderr, log4j2 unconfigured
writes to stdout
//We can add logging later but it has to be done carefully...
- public AsyncServer(Path tikaConfigPath, InputStream in, PrintStream out,
+ public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
+ long maxExtractSizeToReturn,
long serverParseTimeoutMillis, long
serverWaitTimeoutMillis)
throws IOException, TikaException, SAXException {
this.tikaConfigPath = tikaConfigPath;
this.input = new DataInputStream(in);
this.output = new DataOutputStream(out);
+ this.maxExtractSizeToReturn = maxExtractSizeToReturn;
this.serverParseTimeoutMillis = serverParseTimeoutMillis;
this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
this.parsing = false;
@@ -99,11 +125,13 @@ public class AsyncServer implements Runnable {
public static void main(String[] args) throws Exception {
Path tikaConfig = Paths.get(args[0]);
- long serverParseTimeoutMillis = Long.parseLong(args[1]);
- long serverWaitTimeoutMillis = Long.parseLong(args[2]);
+ long maxForEmitBatchBytes = Long.parseLong(args[1]);
+ long serverParseTimeoutMillis = Long.parseLong(args[2]);
+ long serverWaitTimeoutMillis = Long.parseLong(args[3]);
- AsyncServer server =
- new AsyncServer(tikaConfig, System.in, System.out,
serverParseTimeoutMillis,
+ PipesServer server =
+ new PipesServer(tikaConfig, System.in, System.out,
+ maxForEmitBatchBytes, serverParseTimeoutMillis,
serverWaitTimeoutMillis);
System.setIn(new ByteArrayInputStream(new byte[0]));
System.setOut(System.err);
@@ -121,44 +149,47 @@ public class AsyncServer implements Runnable {
synchronized (lock) {
long elapsed = System.currentTimeMillis() - since;
if (parsing && elapsed > serverParseTimeoutMillis) {
- err("server timeout");
- System.exit(1);
+ System.exit(TIMEOUT_EXIT_CODE);
} else if (!parsing && serverWaitTimeoutMillis > 0 &&
elapsed > serverWaitTimeoutMillis) {
- err("closing down from inactivity");
+ debug("closing down from inactivity");
System.exit(0);
}
}
- Thread.sleep(250);
+ Thread.sleep(100);
}
} catch (InterruptedException e) {
//swallow
}
}
- private void err(String msg) {
- System.err.println(msg);
+ private void debug(String msg) {
+ System.err.println("debug " + msg.replaceAll("[\r\n]", " "));
+ System.err.flush();
+ }
+
+ private void warn(Throwable t) {
+ System.err.println("warn " +
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
System.err.flush();
}
private void err(Throwable t) {
- t.printStackTrace();
+ System.err.println("err " +
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
System.err.flush();
}
+
public void processRequests() {
//initialize
try {
initializeParser();
} catch (Throwable t) {
- t.printStackTrace();
- System.err.flush();
+ err(t);
try {
output.writeByte(FAILED_TO_START);
output.flush();
} catch (IOException e) {
- e.printStackTrace();
- System.err.flush();
+ warn(e);
}
return;
}
@@ -172,8 +203,7 @@ public class AsyncServer implements Runnable {
output.writeByte(PING);
output.flush();
} else if (request == CALL) {
- EmitData emitData = parseOne();
- write(emitData);
+ parseOne();
} else {
throw new IllegalStateException("Unexpected request");
}
@@ -181,75 +211,120 @@ public class AsyncServer implements Runnable {
}
} catch (Throwable t) {
t.printStackTrace();
+ err(t);
+ System.err.println("exiting");
+ exit(1);
}
System.err.flush();
}
- private void write(EmitData emitData) {
+ private boolean metadataIsEmpty(List<Metadata> metadataList) {
+ return metadataList == null || metadataList.size() == 0;
+ }
+
+ /**
+ * returns stack trace if there was a container exception or empty string
+ * if there was no stacktrace
+ * @param t
+ * @param metadataList
+ * @return
+ */
+ private String getContainerStacktrace(FetchEmitTuple t, List<Metadata>
metadataList) {
+ if (metadataList == null || metadataList.size() < 1) {
+ return "";
+ }
+ String stack =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+ return (stack != null) ? stack : "";
+ }
+
+
+ private void emit(EmitData emitData, String parseExceptionStack) {
+ Emitter emitter =
emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+ if (emitter == null) {
+ write(NO_EMITTER_FOUND, new byte[0]);
+ return;
+ }
try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(emitData);
- }
- byte[] bytes = bos.toByteArray();
- int len = bytes.length;
- output.write(READY);
- output.writeInt(len);
- output.write(bytes);
- output.flush();
- } catch (IOException e) {
- err(e);
- //LOG.error("problem writing emit data", e);
- exit(1);
+ emitter.emit(emitData.getEmitKey().getEmitKey(),
emitData.getMetadataList());
+ } catch (IOException | TikaEmitterException e) {
+ String msg = ExceptionUtils.getStackTrace(e);
+ byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+ //for now, we're hiding the parse exception if there was also an
emit exception
+ write(EMIT_EXCEPTION, bytes);
+ return;
+ }
+ if (StringUtils.isBlank(parseExceptionStack)) {
+ write(EMIT_SUCCESS);
+ } else {
+ write(EMIT_SUCCESS_PARSE_EXCEPTION,
parseExceptionStack.getBytes(StandardCharsets.UTF_8));
}
}
- private EmitData parseOne() throws FetchException {
+
+ private void parseOne() throws FetchException {
synchronized (lock) {
parsing = true;
since = System.currentTimeMillis();
}
try {
- FetchEmitTuple t = readFetchEmitTuple();
- Metadata userMetadata = t.getMetadata();
- Metadata metadata = new Metadata();
- String fetcherName = t.getFetchKey().getFetcherName();
- String fetchKey = t.getFetchKey().getFetchKey();
- List<Metadata> metadataList = null;
- Fetcher fetcher = null;
- try {
- fetcher = fetcherManager.getFetcher(fetcherName);
- } catch (TikaException | IOException e) {
- err(e);
- //LOG.error("can't get fetcher", e);
- throw new FetchException(e);
+ actuallyParse();
+ } finally {
+ synchronized (lock) {
+ parsing = false;
+ since = System.currentTimeMillis();
}
+ }
+ }
- try (InputStream stream = fetcher.fetch(fetchKey, metadata)) {
- metadataList = parseMetadata(t, stream, metadata);
- } catch (SecurityException e) {
- throw e;
- } catch (TikaException | IOException e) {
- err(e);
- //LOG.error("problem reading from fetcher", e);
- throw new FetchException(e);
- } catch (OutOfMemoryError e) {
- //LOG.error("oom", e);
- handleOOM(e);
- }
+ public void actuallyParse() throws FetchException {
+ FetchEmitTuple t = readFetchEmitTuple();
+ List<Metadata> metadataList = null;
- injectUserMetadata(userMetadata, metadataList);
+ Fetcher fetcher = getFetcher(t.getFetchKey().getFetcherName());
+
+ Metadata metadata = new Metadata();
+ try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(),
metadata)) {
+ metadataList = parseMetadata(t, stream, metadata);
+ } catch (SecurityException e) {
+ throw e;
+ } catch (TikaException | IOException e) {
+ warn(e);
+ throw new FetchException(e);
+ } catch (OutOfMemoryError e) {
+ handleOOM(e);
+ }
+ if (metadataIsEmpty(metadataList)) {
+ write(EMPTY_OUTPUT);
+ return;
+ }
+
+ String stack = getContainerStacktrace(t, metadataList);
+ if (StringUtils.isBlank(stack) || t.getOnParseException() ==
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
+ injectUserMetadata(t.getMetadata(), metadataList);
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getEmitKey())) {
- emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+ emitKey = new EmitKey(emitKey.getEmitterName(),
t.getFetchKey().getFetchKey());
t.setEmitKey(emitKey);
}
- return new EmitData(t.getEmitKey(), metadataList);
- } finally {
- synchronized (lock) {
- parsing = false;
- since = System.currentTimeMillis();
+ EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
+ if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
+ emit(emitData, stack);
+ } else {
+ write(emitData, stack);
}
+ } else {
+ write(PARSE_EXCEPTION_NO_EMIT,
stack.getBytes(StandardCharsets.UTF_8));
+ }
+
+ }
+
+ private Fetcher getFetcher(String fetcherName) throws FetchException {
+ try {
+ return fetcherManager.getFetcher(fetcherName);
+ } catch (TikaException | IOException e) {
+ warn(e);
+ //LOG.error("can't get fetcher", e);
+ throw new FetchException(e);
}
}
@@ -266,31 +341,28 @@ public class AsyncServer implements Runnable {
private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple,
InputStream stream,
Metadata metadata) {
- //make these configurable
- BasicContentHandlerFactory.HANDLER_TYPE type =
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
-
-
+ HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
RecursiveParserWrapperHandler handler = new
RecursiveParserWrapperHandler(
- new BasicContentHandlerFactory(type,
- fetchEmitTuple.getHandlerConfig().getWriteLimit()),
- fetchEmitTuple.getHandlerConfig().getMaxEmbeddedResources(),
+ new BasicContentHandlerFactory(handlerConfig.getType(),
+ handlerConfig.getWriteLimit()),
+ handlerConfig.getMaxEmbeddedResources(),
tikaConfig.getMetadataFilter());
ParseContext parseContext = new ParseContext();
FetchKey fetchKey = fetchEmitTuple.getFetchKey();
try {
parser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
- err(e);
+ warn(e);
//LOG.warn("problem:" + fetchKey.getFetchKey(), e);
} catch (EncryptedDocumentException e) {
- err(e);
+ warn(e);
//LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
} catch (SecurityException e) {
- err(e);
+ warn(e);
//LOG.warn("security exception: " + fetchKey.getFetchKey());
throw e;
} catch (Exception e) {
- err(e);
+ warn(e);
//LOG.warn("exception: " + fetchKey.getFetchKey());
} catch (OutOfMemoryError e) {
//TODO, maybe return file type gathered so far and then crash?
@@ -312,6 +384,8 @@ public class AsyncServer implements Runnable {
private void exit(int exitCode) {
+ System.err.println("exiting: " + exitCode);
+ System.err.flush();
System.exit(exitCode);
}
@@ -342,6 +416,7 @@ public class AsyncServer implements Runnable {
//TODO allowed named configurations in tika config
this.tikaConfig = new TikaConfig(tikaConfigPath);
this.fetcherManager = FetcherManager.load(tikaConfigPath);
+ this.emitterManager = EmitterManager.load(tikaConfigPath);
Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
this.parser = new RecursiveParserWrapper(autoDetectParser);
@@ -352,4 +427,42 @@ public class AsyncServer implements Runnable {
super(t);
}
}
+
+ private void write(EmitData emitData, String stack) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(emitData);
+ }
+ write(PARSE_SUCCESS, bos.toByteArray());
+ } catch (IOException e) {
+ err(e);
+ //LOG.error("problem writing emit data", e);
+ exit(1);
+ }
+ }
+
+ private void write(byte status, byte[] bytes) {
+ try {
+ int len = bytes.length;
+ output.write(status);
+ output.writeInt(len);
+ output.write(bytes);
+ output.flush();
+ } catch (IOException e) {
+ err(e);
+ exit(1);
+ }
+ }
+
+ private void write(byte status) {
+ try {
+ output.write(status);
+ output.flush();
+ } catch (IOException e) {
+ err(e);
+ exit(1);
+ }
+ }
+
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
deleted file mode 100644
index b01eb93..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.utils.ProcessUtils;
-
-public class AsyncClient implements Closeable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AsyncProcessor.class);
-
- //TODO: make these configurable
- private long parseTimeoutMillis = 30000;
- private long waitTimeoutMillis = 500000;
-
- private Process process;
- private final Path tikaConfigPath;
- private DataOutputStream output;
- private DataInputStream input;
-
- public AsyncClient(Path tikaConfigPath) {
- this.tikaConfigPath = tikaConfigPath;
- }
-
- private int filesProcessed = 0;
-
- public int getFilesProcessed() {
- return filesProcessed;
- }
-
- private boolean ping() {
- if (process == null || ! process.isAlive()) {
- return false;
- }
- try {
- output.write(AsyncServer.PING);
- output.flush();
- int ping = input.read();
- if (ping == AsyncServer.PING) {
- return true;
- }
- } catch (IOException e) {
- return false;
- }
- return false;
- }
-
- @Override
- public void close() {
- process.destroyForcibly();
- }
-
- public AsyncResult process(FetchEmitTuple t) throws IOException {
- if (! ping()) {
- restart();
- }
- //TODO consider adding a timer here too
- // this could block forever if the watchdog thread in the server fails
- // or is starved
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (ObjectOutputStream objectOutputStream = new
ObjectOutputStream(bos)) {
- objectOutputStream.writeObject(t);
- }
- byte[] bytes = bos.toByteArray();
- output.write(AsyncServer.CALL);
- output.writeInt(bytes.length);
- output.write(bytes);
- output.flush();
-
- long start = System.currentTimeMillis();
- try {
- return readResults(t);
- } catch (IOException e) {
- long elapsed = System.currentTimeMillis() - start;
- if (elapsed > parseTimeoutMillis) {
- LOG.warn("{} timed out", t.getId());
- return AsyncResult.TIMEOUT;
- }
- return AsyncResult.UNSPECIFIED_CRASH;
- }
- }
-
- private AsyncResult readResults(FetchEmitTuple t) throws IOException {
- int status = input.read();
- //TODO clean this up, never return null
- if (status == AsyncServer.OOM) {
- LOG.warn(t.getId() + " oom");
- return AsyncResult.OOM;
- } else if (status == AsyncServer.READY) {
- } else {
- throw new IOException("problem reading response from server " +
status);
- }
- int length = input.readInt();
- byte[] bytes = new byte[length];
- input.readFully(bytes);
- try (ObjectInputStream objectInputStream =
- new ObjectInputStream(new ByteArrayInputStream(bytes))) {
- return new AsyncResult((EmitData)objectInputStream.readObject());
- } catch (ClassNotFoundException e) {
- //this should be catastrophic
- throw new RuntimeException(e);
- }
-
- }
-
- private void restart() throws IOException {
- if (process != null) {
- process.destroyForcibly();
- }
- LOG.debug("restarting process");
- ProcessBuilder pb = new ProcessBuilder(getCommandline());
- pb.redirectError(ProcessBuilder.Redirect.INHERIT);
- process = pb.start();
- input = new DataInputStream(process.getInputStream());
- output = new DataOutputStream(process.getOutputStream());
- }
-
- private String[] getCommandline() {
- //TODO: make this all configurable
- return new String[]{
- "java",
- "-cp",
- System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncServer",
-
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()),
- Long.toString(parseTimeoutMillis),
- Long.toString(waitTimeoutMillis),
- };
- }
-}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
index c8b8ccb..e12b7c1 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
@@ -22,9 +22,6 @@ import java.nio.file.Path;
class AsyncClientConfig {
private int fetchQueueSize = 20000;
- private int numWorkers = 10;
- private String[] workerJVMArgs;
- private long parseTimeoutMs;
private long waitTimeoutMs;
private long maxFilesProcessed;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index d16dd16..e939e85 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -17,20 +17,93 @@
package org.apache.tika.pipes.async;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
-class AsyncConfig {
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.PipesConfigBase;
+public class AsyncConfig extends PipesConfigBase {
- private AsyncClientConfig asyncClientConfig;
- private AsyncClientConfig asyncRetryClientConfig;
- private AsyncEmitterConfig asyncEmitterConfig;
+ private long emitWithinMillis = 10000;
+ private long emitMaxEstimatedBytes = 100000;
- public static AsyncConfig load(Path p) throws IOException {
- AsyncConfig asyncConfig = new AsyncConfig();
+ private long maxForEmitBatchBytes = 0;
+ private int queueSize = 10000;
+ private final int numEmitters = 1;
+ public static AsyncConfig load(Path p) throws IOException,
TikaConfigException {
+ AsyncConfig asyncConfig = new AsyncConfig();
+ try (InputStream is = Files.newInputStream(p)) {
+ asyncConfig.configure("async", is);
+ }
return asyncConfig;
}
+ public long getEmitWithinMillis() {
+ return emitWithinMillis;
+ }
+
+ /**
+ * If nothing has been emitted in this amount of time
+ * and the {@link #getEmitMaxEstimatedBytes()} has not been reached yet,
+ * emit what's in the emit queue.
+ *
+ * @param emitWithinMillis
+ */
+ public void setEmitWithinMillis(long emitWithinMillis) {
+ this.emitWithinMillis = emitWithinMillis;
+ }
+
+ /**
+ * When the emit queue hits this estimated size (sum of
+ * estimated extract sizes), emit the batch.
+ * @return
+ */
+ public long getEmitMaxEstimatedBytes() {
+ return emitMaxEstimatedBytes;
+ }
+
+ public void setEmitMaxEstimatedBytes(long emitMaxEstimatedBytes) {
+ this.emitMaxEstimatedBytes = emitMaxEstimatedBytes;
+ }
+
+ /**
+ * What is the maximum bytes size per extract that
+ * will be allowed in the emit queue. If an extract is too
+ * big, skip the emit queue and forward it directly from the processor.
If
+ * set to <code>0</code>, this will never send a an extract back for
batch emitting,
+ * but will emit the extract directly from the processor.
+ * @return
+ */
+ public long getMaxForEmitBatchBytes() {
+ return maxForEmitBatchBytes;
+ }
+
+ public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
+ this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+ }
+
+ /**
+ * FetchEmitTuple queue size
+ * @return
+ */
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ /**
+ * Number of emitters
+ *
+ * @return
+ */
+ public int getNumEmitters() {
+ return numEmitters;
+ }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index fa53764..9d40b47 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.EmitterManager;
@@ -106,8 +105,7 @@ public class AsyncEmitter implements Callable<Integer> {
void add(EmitData data) {
size++;
- long sz = AbstractEmitter
- .estimateSizeInBytes(data.getEmitKey().getEmitKey(),
data.getMetadataList());
+ long sz = data.getEstimatedSizeBytes();
if (estimatedSize + sz > maxBytes) {
LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
(estimatedSize + sz), maxBytes);
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
deleted file mode 100644
index ed6973c..0000000
---
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-class AsyncEmitterConfig {
-
- private long emitWithinMs;
- private long emitMaxEstimatedBytes;
- private final int numEmitters = 1;
-
-
-
-}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index cd08d4c..0d66074 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -33,6 +33,9 @@ import org.xml.sax.SAXException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
@@ -45,12 +48,11 @@ import org.apache.tika.pipes.fetchiterator.FetchIterator;
public class AsyncProcessor implements Closeable {
static final int PARSER_FUTURE_CODE = 1;
- private final Path tikaConfigPath;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitData;
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
- private final int fetchEmitTupleQSize = 1000;
+ private final int fetchEmitTupleQSize = 10000;
private int numParserThreads = 10;
private int numEmitterThreads = 2;
private int numParserThreadsFinished = 0;
@@ -59,15 +61,15 @@ public class AsyncProcessor implements Closeable {
boolean isShuttingDown = false;
public AsyncProcessor(Path tikaConfigPath) throws TikaException,
IOException, SAXException {
- this.tikaConfigPath = tikaConfigPath;
this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
this.emitData = new ArrayBlockingQueue<>(100);
this.executorService = Executors.newFixedThreadPool(numParserThreads +
numEmitterThreads);
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
+ AsyncConfig asyncConfig = AsyncConfig.load(tikaConfigPath);
for (int i = 0; i < numParserThreads; i++) {
- executorCompletionService.submit(new
FetchEmitWorker(tikaConfigPath, fetchEmitTuples,
+ executorCompletionService.submit(new FetchEmitWorker(asyncConfig,
fetchEmitTuples,
emitData));
}
@@ -78,7 +80,7 @@ public class AsyncProcessor implements Closeable {
}
public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples,
long offerMs)
- throws AsyncRuntimeException, InterruptedException {
+ throws PipesException, InterruptedException {
if (isShuttingDown) {
throw new IllegalStateException(
"Can't call offer after calling close() or " +
"shutdownNow()");
@@ -111,7 +113,7 @@ public class AsyncProcessor implements Closeable {
}
public synchronized boolean offer(FetchEmitTuple t, long offerMs)
- throws AsyncRuntimeException, InterruptedException {
+ throws PipesException, InterruptedException {
if (fetchEmitTuples == null) {
throw new IllegalStateException("queue hasn't been initialized
yet.");
} else if (isShuttingDown) {
@@ -153,21 +155,21 @@ public class AsyncProcessor implements Closeable {
private class FetchEmitWorker implements Callable<Integer> {
- private final Path tikaConfigPath;
+ private final AsyncConfig asyncConfig;
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
- private FetchEmitWorker(Path tikaConfigPath,
+ private FetchEmitWorker(AsyncConfig asyncConfig,
ArrayBlockingQueue<FetchEmitTuple>
fetchEmitTuples,
ArrayBlockingQueue<EmitData> emitDataQueue) {
- this.tikaConfigPath = tikaConfigPath;
+ this.asyncConfig = asyncConfig;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataQueue = emitDataQueue;
}
@Override
public Integer call() throws Exception {
- try (AsyncClient asyncClient = new AsyncClient(tikaConfigPath)) {
+ try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
while (true) {
FetchEmitTuple t = fetchEmitTuples.poll(1,
TimeUnit.SECONDS);
if (t == null) {
@@ -175,15 +177,24 @@ public class AsyncProcessor implements Closeable {
} else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return PARSER_FUTURE_CODE;
} else {
- AsyncResult result = null;
+ PipesResult result = null;
try {
- result = asyncClient.process(t);
+ result = pipesClient.process(t);
} catch (IOException e) {
- result = AsyncResult.UNSPECIFIED_CRASH;
+ result = PipesResult.UNSPECIFIED_CRASH;
}
- if (result.getStatus() == AsyncResult.STATUS.OK) {
- //TODO -- add timeout, this currently hangs forever
- emitDataQueue.offer(result.getEmitData());
+ switch (result.getStatus()) {
+ case PARSE_SUCCESS:
+ //TODO -- add timeout, this currently hangs
forever
+ emitDataQueue.offer(result.getEmitData());
+ break;
+ case EMIT_SUCCESS:
+ break;
+ case EMIT_EXCEPTION:
+ case UNSPECIFIED_CRASH:
+ case OOM:
+ case TIMEOUT:
+ break;
}
}
checkActive();
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java
deleted file mode 100644
index 115e344..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import org.apache.tika.pipes.emitter.EmitData;
-
-public class AsyncResult {
-
- enum STATUS {
- OK, OOM, TIMEOUT, UNSPECIFIED_CRASH
- }
- public static AsyncResult TIMEOUT = new AsyncResult(STATUS.TIMEOUT);
- public static AsyncResult OOM = new AsyncResult(STATUS.OOM);
- public static AsyncResult UNSPECIFIED_CRASH = new
AsyncResult(STATUS.UNSPECIFIED_CRASH);
-
- private final STATUS status;
- private final EmitData emitData;
-
- private AsyncResult(STATUS status, EmitData emitData) {
- this.status = status;
- this.emitData = emitData;
- }
-
- public AsyncResult(STATUS status) {
- this(status, null);
- }
-
- public AsyncResult(EmitData emitData) {
- this(STATUS.OK, emitData);
- }
-
- public STATUS getStatus() {
- return status;
- }
-
- public EmitData getEmitData() {
- return emitData;
- }
-}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index f7520ad..648e094 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -19,25 +19,10 @@ package org.apache.tika.pipes.emitter;
import java.io.IOException;
import java.util.List;
-import org.apache.tika.metadata.Metadata;
-
public abstract class AbstractEmitter implements Emitter {
private String name;
- public static long estimateSizeInBytes(String id, List<Metadata>
metadataList) {
- long sz = 36 + id.length() * 2;
- for (Metadata m : metadataList) {
- for (String n : m.names()) {
- sz += 36 + n.length() * 2;
- for (String v : m.getValues(n)) {
- sz += 36 + v.length() * 2;
- }
- }
- }
- return sz;
- }
-
@Override
public String getName() {
return name;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
index 797de58..c744140 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -43,6 +43,22 @@ public class EmitData implements Serializable {
return metadataList;
}
+ public long getEstimatedSizeBytes() {
+ return estimateSizeInBytes(getEmitKey().getEmitKey(),
getMetadataList());
+ }
+
+ private static long estimateSizeInBytes(String id, List<Metadata>
metadataList) {
+ long sz = 36 + id.length() * 2;
+ for (Metadata m : metadataList) {
+ for (String n : m.names()) {
+ sz += 36 + n.length() * 2;
+ for (String v : m.getValues(n)) {
+ sz += 36 + v.length() * 2;
+ }
+ }
+ }
+ return sz;
+ }
@Override
public String toString() {
return "EmitData{" + "emitKey=" + emitKey + ", metadataList=" +
metadataList + '}';
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index f493ffb..490a77b 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -53,7 +53,7 @@ public abstract class FetchIterator
public static final int DEFAULT_QUEUE_SIZE = 1000;
public static final FetchEmitTuple COMPLETED_SEMAPHORE =
- new FetchEmitTuple(null, null, null, null);
+ new FetchEmitTuple(null,null, null, null, null, null);
private static final Logger LOGGER =
LoggerFactory.getLogger(FetchIterator.class);
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 2f8224e..aef095d 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -102,7 +102,7 @@ public class FileSystemFetchIterator extends FetchIterator
implements Initializa
String relPath = basePath.relativize(file).toString();
try {
- tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, relPath),
+ tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherName,
relPath),
new EmitKey(emitterName, relPath), new Metadata()));
} catch (TimeoutException e) {
throw new IOException(e);
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 79bf911..6a82c6f 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -65,7 +65,13 @@ public class AsyncProcessorTest {
" <fetcher
class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
" <params><name>mock</name>\n" + " <basePath>" +
ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
- "</basePath></params>\n" + " </fetcher>" + " </fetchers>"
+ "</properties>";
+ "</basePath></params>\n" + " </fetcher>" + " </fetchers>" +
+ "<async><params><tikaConfig>" +
+
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
+ "</tikaConfig><forkedJvmArgs><arg>-Xmx256m</arg" +
+
"></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
+ "</params></async>" +
+ "</properties>";
Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
Random r = new Random();
for (int i = 0; i < totalFiles; i++) {
@@ -89,7 +95,8 @@ public class AsyncProcessorTest {
public void testBasic() throws Exception {
AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
for (int i = 0; i < totalFiles; i++) {
- FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", i +
".xml"),
+ FetchEmitTuple t = new FetchEmitTuple("myId",
+ new FetchKey("mock", i + ".xml"),
new EmitKey("mock", "emit-" + i), new Metadata());
processor.offer(t, 1000);
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 6f65140..7799b05 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -53,6 +53,13 @@ import org.apache.tika.utils.StringUtils;
* to the metadata object.
* <p>
* <ul>
+ * <li>If an 'idColumn' is specified, this will use that
+ * column's value as the id.</li>
+ * <li>If no 'idColumn' is specified, but a 'fetchKeyColumn' is specified,
+ * the string in the 'fetchKeyColumn' will be used as the 'id'.</li>
+ * <li>The 'idColumn' value is not added to the metadata.</li>
+ * </ul>
+ * <ul>
* <li>If a 'fetchKeyColumn' is specified, this will use that
* column's value as the fetchKey.</li>
* <li>If no 'fetchKeyColumn' is specified, this will send the
@@ -76,7 +83,7 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
private Path csvPath;
private String fetchKeyColumn;
private String emitKeyColumn;
-
+ private String idColumn;
@Field
public void setCsvPath(String csvPath) {
@@ -94,6 +101,11 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
}
@Field
+ public void setIdColumn(String idColumn) {
+ this.idColumn = idColumn;
+ }
+
+ @Field
public void setCsvPath(Path csvPath) {
this.csvPath = csvPath;
}
@@ -114,6 +126,7 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
checkFetchEmitValidity(fetcherName, emitterName,
fetchEmitKeyIndices, headers);
HandlerConfig handlerConfig = getHandlerConfig();
for (CSVRecord record : records) {
+ String id = getId(fetchEmitKeyIndices, record);
String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
String emitKey = getEmitKey(fetchEmitKeyIndices, record);
if (StringUtils.isBlank(fetchKey) &&
!StringUtils.isBlank(fetcherName)) {
@@ -123,8 +136,11 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
if (StringUtils.isBlank(emitKey)) {
throw new IOException("emitKey must not be blank in :" +
record);
}
+ if (StringUtils.isBlank(id) && !
StringUtils.isBlank(fetchKey)) {
+ id = fetchKey;
+ }
Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers,
record);
- tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName,
fetchKey),
+ tryToAdd(new FetchEmitTuple(id, new FetchKey(fetcherName,
fetchKey),
new EmitKey(emitterName, emitKey), metadata,
handlerConfig,
getOnParseException()));
}
@@ -169,6 +185,14 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
}
}
+ private String getId(FetchEmitKeyIndices fetchEmitKeyIndices, CSVRecord
record) {
+ if (fetchEmitKeyIndices.idIndex > -1) {
+ return record.get(fetchEmitKeyIndices.idIndex);
+ }
+ return StringUtils.EMPTY;
+ }
+
+
private String getFetchKey(FetchEmitKeyIndices fetchEmitKeyIndices,
CSVRecord record) {
if (fetchEmitKeyIndices.fetchKeyIndex > -1) {
return record.get(fetchEmitKeyIndices.fetchKeyIndex);
@@ -200,7 +224,7 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
throws IOException {
int fetchKeyColumnIndex = -1;
int emitKeyColumnIndex = -1;
-
+ int idIndex = -1;
for (int col = 0; col < record.size(); col++) {
String header = record.get(col);
if (StringUtils.isBlank(header)) {
@@ -212,9 +236,11 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
fetchKeyColumnIndex = col;
} else if (header.equals(emitKeyColumn)) {
emitKeyColumnIndex = col;
+ } else if (header.equals(idColumn)) {
+ idIndex = col;
}
}
- return new FetchEmitKeyIndices(fetchKeyColumnIndex,
emitKeyColumnIndex);
+ return new FetchEmitKeyIndices(idIndex, fetchKeyColumnIndex,
emitKeyColumnIndex);
}
@Override
@@ -225,16 +251,18 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
}
private static class FetchEmitKeyIndices {
+ private final int idIndex;
private final int fetchKeyIndex;
private final int emitKeyIndex;
- public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+ public FetchEmitKeyIndices(int idIndex, int fetchKeyIndex, int
emitKeyIndex) {
+ this.idIndex = idIndex;
this.fetchKeyIndex = fetchKeyIndex;
this.emitKeyIndex = emitKeyIndex;
}
public boolean shouldSkip(int index) {
- return fetchKeyIndex == index || emitKeyIndex == index;
+ return idIndex == index || fetchKeyIndex == index || emitKeyIndex
== index;
}
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index a12cd06..ac8f2a2 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -69,6 +69,7 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(JDBCFetchIterator.class);
+ private String idColumn;
private String fetchKeyColumn;
private String emitKeyColumn;
private String connection;
@@ -77,6 +78,11 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
private Connection db;
@Field
+ public void setIdColumn(String idColumn) {
+ this.idColumn = idColumn;
+ }
+
+ @Field
public void setFetchKeyColumn(String fetchKeyColumn) {
this.fetchKeyColumn = fetchKeyColumn;
}
@@ -163,6 +169,7 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
Metadata metadata = new Metadata();
String fetchKey = "";
String emitKey = "";
+ String id = "";
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
if (i == fetchEmitKeyIndices.fetchKeyIndex) {
fetchKey = getString(i, rs);
@@ -180,13 +187,21 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
emitKey = (emitKey == null) ? "" : emitKey;
continue;
}
+ if (i == fetchEmitKeyIndices.idIndex) {
+ id = getString(i, rs);
+ if (id == null) {
+ LOGGER.warn("id is empty for record " + toString(rs));
+ }
+ id = (id == null) ? "" : id;
+ continue;
+ }
String val = getString(i, rs);
if (val != null) {
metadata.set(headers.get(i - 1), val);
}
}
- tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+ tryToAdd(new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey),
new EmitKey(emitterName, emitKey), metadata, handlerConfig,
getOnParseException()));
}
@@ -213,6 +228,7 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData,
List<String> headers)
throws SQLException {
+ int idIndex = -1;
int fetchKeyIndex = -1;
int emitKeyIndex = -1;
for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -222,9 +238,13 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
if (metaData.getColumnLabel(i).equalsIgnoreCase(emitKeyColumn)) {
emitKeyIndex = i;
}
+ if (metaData.getColumnLabel(i).equalsIgnoreCase(idColumn)) {
+ idIndex = i;
+ }
+
headers.add(metaData.getColumnLabel(i));
}
- return new FetchEmitKeyIndices(fetchKeyIndex, emitKeyIndex);
+ return new FetchEmitKeyIndices(idIndex, fetchKeyIndex, emitKeyIndex);
}
@Override
@@ -257,16 +277,18 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
}
private static class FetchEmitKeyIndices {
+ private final int idIndex;
private final int fetchKeyIndex;
private final int emitKeyIndex;
- public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+ public FetchEmitKeyIndices(int idIndex, int fetchKeyIndex, int
emitKeyIndex) {
+ this.idIndex = idIndex;
this.fetchKeyIndex = fetchKeyIndex;
this.emitKeyIndex = emitKeyIndex;
}
public boolean shouldSkip(int index) {
- return fetchKeyIndex == index || emitKeyIndex == index;
+ return idIndex == index || fetchKeyIndex == index || emitKeyIndex
== index;
}
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index bfe918a..9de5010 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -132,7 +132,7 @@ public class TestJDBCFetchIterator {
fail("failed to find key pattern: " + k);
}
String aOrB = Integer.parseInt(num) % 2 == 0 ? "a" : "b";
- assertEquals("id" + num, p.getMetadata().get("MY_ID"));
+ assertEquals("id" + num, p.getId());
assertEquals("project" + aOrB,
p.getMetadata().get("MY_PROJECT"));
assertNull(p.getMetadata().get("fetchKey"));
assertNull(p.getMetadata().get("MY_FETCHKEY"));
@@ -151,6 +151,7 @@ public class TestJDBCFetchIterator {
" <fetcherName>s3f</fetcherName>\n" +
" <emitterName>s3e</emitterName>\n" +
" <queueSize>57</queueSize>\n" +
+ " <idColumn>my_id</idColumn>\n" +
"
<fetchKeyColumn>my_fetchkey</fetchKeyColumn>\n" +
" <emitKeyColumn>my_fetchkey</emitKeyColumn>\n"
+
" <select>" +
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index c8f8663..11a7dd0 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -132,7 +132,9 @@ public class S3FetchIterator extends FetchIterator
implements Initializable {
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
elapsed);
- tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName,
summary.getKey()),
+ //TODO -- allow user specified metadata as the "id"?
+ tryToAdd(new FetchEmitTuple(summary.getKey(), new
FetchKey(fetcherName,
+ summary.getKey()),
new EmitKey(emitterName, summary.getKey()), new
Metadata(), handlerConfig,
getOnParseException()));
count++;
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 870b5e9..342a097 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -45,11 +45,12 @@ import
org.apache.cxf.transport.common.gzip.GZIPInInterceptor;
import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
import org.apache.tika.Tika;
import org.apache.tika.config.ServiceLoader;
import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.parser.DigestingParser;
import org.apache.tika.parser.digestutils.BouncyCastleDigester;
import org.apache.tika.parser.digestutils.CommonsDigester;
@@ -57,9 +58,9 @@ import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.server.core.resource.AsyncResource;
import org.apache.tika.server.core.resource.DetectorResource;
-import org.apache.tika.server.core.resource.EmitterResource;
import org.apache.tika.server.core.resource.LanguageResource;
import org.apache.tika.server.core.resource.MetadataResource;
+import org.apache.tika.server.core.resource.PipesResource;
import org.apache.tika.server.core.resource.RecursiveMetadataResource;
import org.apache.tika.server.core.resource.TikaDetectors;
import org.apache.tika.server.core.resource.TikaMimeTypes;
@@ -231,7 +232,7 @@ public class TikaServerProcess {
List<ResourceProvider> resourceProviders = new ArrayList<>();
List<Object> providers = new ArrayList<>();
- loadAllProviders(tikaServerConfig, asyncResource, fetcherManager,
+ loadAllProviders(tikaServerConfig,
serverStatus,
resourceProviders,
providers);
@@ -258,14 +259,12 @@ public class TikaServerProcess {
}
private static void loadAllProviders(TikaServerConfig tikaServerConfig,
- AsyncResource asyncResource,
- FetcherManager fetcherManager,
ServerStatus serverStatus,
List<ResourceProvider>
resourceProviders,
List<Object> writers)
- throws TikaConfigException, IOException {
+ throws TikaException, SAXException, IOException {
List<ResourceProvider> tmpCoreProviders =
- loadCoreProviders(tikaServerConfig, asyncResource,
fetcherManager, serverStatus);
+ loadCoreProviders(tikaServerConfig, serverStatus);
resourceProviders.addAll(tmpCoreProviders);
resourceProviders.add(new SingletonResourceProvider(new
TikaWelcome(tmpCoreProviders)));
@@ -309,11 +308,11 @@ public class TikaServerProcess {
}
private static List<ResourceProvider> loadCoreProviders(TikaServerConfig
tikaServerConfig,
- AsyncResource
asyncResource,
- FetcherManager
fetcherManager,
ServerStatus
serverStatus)
- throws TikaConfigException, IOException {
+ throws TikaException, IOException, SAXException {
List<ResourceProvider> resourceProviders = new ArrayList<>();
+ boolean addAsyncResource = false;
+ boolean addPipesResource = false;
if (tikaServerConfig.getEndpoints().size() == 0) {
resourceProviders.add(new SingletonResourceProvider(new
MetadataResource()));
resourceProviders.add(new SingletonResourceProvider(new
RecursiveMetadataResource()));
@@ -329,54 +328,73 @@ public class TikaServerProcess {
resourceProviders.add(new SingletonResourceProvider(new
TikaParsers()));
resourceProviders.add(new SingletonResourceProvider(new
TikaVersion()));
if (tikaServerConfig.isEnableUnsecureFeatures()) {
- resourceProviders.add(new SingletonResourceProvider(new
EmitterResource(
- fetcherManager,
EmitterManager.load(tikaServerConfig.getConfigPath())
- )));
- resourceProviders.add(new
SingletonResourceProvider(asyncResource));
+ addAsyncResource = true;
+ addPipesResource = true;
resourceProviders
.add(new SingletonResourceProvider(new
TikaServerStatus(serverStatus)));
}
- resourceProviders.addAll(loadResourceServices());
- return resourceProviders;
- }
- for (String endPoint : tikaServerConfig.getEndpoints()) {
- if ("meta".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
MetadataResource()));
- } else if ("rmeta".equals(endPoint)) {
- resourceProviders
- .add(new SingletonResourceProvider(new
RecursiveMetadataResource()));
- } else if ("detect".equals(endPoint)) {
- resourceProviders
- .add(new SingletonResourceProvider(new
DetectorResource(serverStatus)));
- } else if ("language".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
LanguageResource()));
- } else if ("translate".equals(endPoint)) {
- resourceProviders
- .add(new SingletonResourceProvider(new
TranslateResource(serverStatus)));
- } else if ("tika".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
TikaResource()));
- } else if ("unpack".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
UnpackerResource()));
- } else if ("mime".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
TikaMimeTypes()));
- } else if ("detectors".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
TikaDetectors()));
- } else if ("parsers".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
TikaParsers()));
- } else if ("version".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(new
TikaVersion()));
- } else if ("emit".equals(endPoint)) {
- resourceProviders.add(new SingletonResourceProvider(
- new EmitterResource(fetcherManager,
-
EmitterManager.load(tikaServerConfig.getConfigPath()))));
- } else if ("async".equals(endPoint)) {
- resourceProviders.add(new
SingletonResourceProvider(asyncResource));
- } else if ("status".equals(endPoint)) {
- resourceProviders
- .add(new SingletonResourceProvider(new
TikaServerStatus(serverStatus)));
+ } else {
+ for (String endPoint : tikaServerConfig.getEndpoints()) {
+ if ("meta".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
MetadataResource()));
+ } else if ("rmeta".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
RecursiveMetadataResource()));
+ } else if ("detect".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
DetectorResource(serverStatus)));
+ } else if ("language".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
LanguageResource()));
+ } else if ("translate".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TranslateResource(serverStatus)));
+ } else if ("tika".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaResource()));
+ } else if ("unpack".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
UnpackerResource()));
+ } else if ("mime".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaMimeTypes()));
+ } else if ("detectors".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaDetectors()));
+ } else if ("parsers".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaParsers()));
+ } else if ("version".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaVersion()));
+ } else if ("pipes".equals(endPoint)) {
+ addPipesResource = true;
+ } else if ("async".equals(endPoint)) {
+ addAsyncResource = true;
+ } else if ("status".equals(endPoint)) {
+ resourceProviders.add(new SingletonResourceProvider(new
TikaServerStatus(serverStatus)));
+ }
}
- resourceProviders.addAll(loadResourceServices());
}
+
+ if (addAsyncResource) {
+ final AsyncResource localAsyncResource = new
AsyncResource(tikaServerConfig.getConfigPath());
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ localAsyncResource.shutdownNow();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ resourceProviders.add(new
SingletonResourceProvider(localAsyncResource));
+ }
+ if (addPipesResource) {
+ final PipesResource localPipesResource =
+ new PipesResource(tikaServerConfig.getConfigPath());
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ localPipesResource.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ resourceProviders.add(new
SingletonResourceProvider(localPipesResource));
+ }
+ resourceProviders.addAll(loadResourceServices());
return resourceProviders;
}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 1c7fe1d..1ee5600 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -55,7 +55,6 @@ public class AsyncResource {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncResource.class);
- private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
long maxQueuePauseMs = 60000;
private final AsyncProcessor asyncProcessor;
private final FetcherManager fetcherManager;
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
deleted file mode 100644
index 425279c..0000000
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tika.server.core.resource;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.UriInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.HandlerConfig;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
-
-@Path("/emit")
-public class EmitterResource {
-
- /**
- * key that is safe to pass through http header.
- * The user _must_ specify this for the fsemitter if calling 'put'
- */
- public static final String EMIT_KEY_FOR_HTTP_HEADER = "emit-key";
- private static final String EMITTER_PARAM = "emitter";
- private static final String HANDLER_PARAM = "type";
- private static final String FETCHER_NAME_ABBREV = "fn";
- private static final String FETCH_KEY_ABBREV = "fk";
- private static final String EMIT_KEY_ABBREV = "ek";
-
- private static final Logger LOG =
LoggerFactory.getLogger(EmitterResource.class);
-
- private final FetcherManager fetcherManager;
- private final EmitterManager emitterManager;
-
- public EmitterResource(FetcherManager fetcherManager, EmitterManager
emitterManager) {
- this.fetcherManager = fetcherManager;
- this.emitterManager = emitterManager;
- }
-
- static EmitKey calcEmitKey(FetchEmitTuple t) {
- //use fetch key if emitter key is not specified
- //TODO: clean this up?
- EmitKey emitKey = t.getEmitKey();
- if (StringUtils.isBlank(emitKey.getEmitKey())) {
- emitKey = new EmitKey(emitKey.getEmitterName(),
t.getFetchKey().getFetchKey());
- }
- return emitKey;
- }
-
- /**
- * @param is input stream is ignored in 'get'
- * @param httpHeaders
- * @param info
- * @param emitterName
- * @param fetcherName specify the fetcherName in the url's query
section
- * @param fetchKey specify the fetch key in the url's query section
- * @param handlerTypeName text, html, xml, body, ignore; default is text
- * @return
- * @throws Exception
- */
- @GET
- @Produces("application/json")
- @Path("{" + EMITTER_PARAM + " : (\\w+)?}")
- public Map<String, String> getRmeta(InputStream is, @Context HttpHeaders
httpHeaders,
- @Context UriInfo info,
- @PathParam(EMITTER_PARAM) String
emitterName,
- @QueryParam(FETCHER_NAME_ABBREV)
String fetcherName,
- @QueryParam(FETCH_KEY_ABBREV) String
fetchKey,
- @QueryParam(EMIT_KEY_ABBREV) String
emitKey,
- @QueryParam(HANDLER_PARAM) String
handlerTypeName)
- throws Exception {
- Metadata metadata = new Metadata();
- Fetcher fetcher = fetcherManager.getFetcher(fetcherName);
- List<Metadata> metadataList;
- try (InputStream fetchedIs = fetcher.fetch(fetchKey, metadata)) {
- HandlerConfig handlerConfig = RecursiveMetadataResource
- .buildHandlerConfig(httpHeaders.getRequestHeaders(),
handlerTypeName);
- metadataList = RecursiveMetadataResource
- .parseMetadata(fetchedIs, metadata,
httpHeaders.getRequestHeaders(), info,
- handlerConfig);
- }
- emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
- return emit(new EmitKey(emitterName, emitKey), metadataList);
- }
-
-
- /**
- * The user puts the raw bytes of the file and specifies the emitter
- * as elsewhere. This will not trigger a fetcher. If you want a
- * fetcher, use the get or post options.
- * <p>
- * The extracted text content is stored with the key
- * {@link TikaCoreProperties#TIKA_CONTENT}
- * <p>
- * Must specify an emitter in the path, e.g. /emit/solr
- * <p>
- * Optionally, may specify handler, e.g. /emit/solr/xml
- *
- * @param info uri info
- * @param fullParam which emitter to use; emitters must be configured in
- * the TikaConfig file.
- * @return InputStream that can be deserialized as a list of {@link
Metadata} objects
- * @throws Exception
- */
- @PUT
- @Produces("application/json")
- @Path("{" + EMITTER_PARAM + " : (\\w+(/(text|body|xml|ignore))?)}")
- public Map<String, String> putRmeta(InputStream is, @Context HttpHeaders
httpHeaders,
- @Context UriInfo info,
- @PathParam(EMITTER_PARAM) String
fullParam)
- throws Exception {
-
- Matcher m = Pattern.compile("(\\w+)(?:/(\\w+))?").matcher(fullParam);
- String emitterName = fullParam;
- String handlerTypeName = "text";
- if (m.find()) {
- emitterName = m.group(1);
- if (m.groupCount() > 1) {
- handlerTypeName = m.group(2);
- }
- }
- Metadata metadata = new Metadata();
- String emitKey = httpHeaders.getHeaderString(EMIT_KEY_FOR_HTTP_HEADER);
- HandlerConfig handlerConfig = RecursiveMetadataResource
- .buildHandlerConfig(httpHeaders.getRequestHeaders(),
handlerTypeName);
- List<Metadata> metadataList = RecursiveMetadataResource
- .parseMetadata(is, metadata, httpHeaders.getRequestHeaders(),
info, handlerConfig);
- return emit(new EmitKey(emitterName, emitKey), metadataList);
- }
-
- /**
- * The client posts a json request. At a minimum, this must be a
- * json object that contains an emitter and a fetcherString key with
- * the key to fetch the inputStream. Optionally, it may contain a metadata
- * object that will be used to populate the metadata key for pass
- * through of metadata from the client. It may also include a handler
config.
- * <p>
- * The extracted text content is stored with the key
- * {@link TikaCoreProperties#TIKA_CONTENT}
- * <p>
- * Must specify a fetcherString and an emitter in the posted json.
- *
- * @param info uri info
- * @return InputStream that can be deserialized as a list of {@link
Metadata} objects
- * @throws Exception
- */
- @POST
- @Produces("application/json")
- public Map<String, String> postRmeta(InputStream is, @Context HttpHeaders
httpHeaders,
- @Context UriInfo info) throws
Exception {
- FetchEmitTuple t = null;
- try (Reader reader = new InputStreamReader(is,
StandardCharsets.UTF_8)) {
- t = JsonFetchEmitTuple.fromJson(reader);
- }
- Metadata metadata = new Metadata();
-
- List<Metadata> metadataList = null;
- try (InputStream stream = fetcherManager
- .getFetcher(t.getFetchKey().getFetcherName())
- .fetch(t.getFetchKey().getFetchKey(), metadata)) {
-
- metadataList = RecursiveMetadataResource
- .parseMetadata(stream, metadata,
httpHeaders.getRequestHeaders(), info,
- t.getHandlerConfig());
- } catch (Error error) {
- return returnError(t.getEmitKey().getEmitterName(), error);
- }
-
- boolean shouldEmit = checkParseException(t, metadataList);
- if (!shouldEmit) {
- return skip(t, metadataList);
- }
-
- injectUserMetadata(t.getMetadata(), metadataList.get(0));
-
- for (String n : metadataList.get(0).names()) {
- LOG.debug("post parse/pre emit metadata {}: {}", n,
metadataList.get(0).get(n));
- }
- return emit(calcEmitKey(t), metadataList);
- }
-
- private Map<String, String> skip(FetchEmitTuple t, List<Metadata>
metadataList) {
- Map<String, String> statusMap = new HashMap<>();
- statusMap.put("status", "ok");
- statusMap.put("emitter", t.getEmitKey().getEmitterName());
- statusMap.put("emitKey", t.getEmitKey().getEmitKey());
- String msg =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
- statusMap.put("parse_exception", msg);
- return statusMap;
- }
-
- private boolean checkParseException(FetchEmitTuple t, List<Metadata>
metadataList) {
- if (metadataList == null || metadataList.size() < 1) {
- return false;
- }
- boolean shouldEmit = true;
- String stack =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
- if (stack != null) {
- if (t.getOnParseException() ==
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
- shouldEmit = false;
- }
- LOG.warn("fetchKey ({}) caught container parse exception ({})",
- t.getFetchKey().getFetchKey(), stack);
- }
-
- for (int i = 1; i < metadataList.size(); i++) {
- Metadata m = metadataList.get(i);
- String embeddedStack =
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
- if (embeddedStack != null) {
- LOG.warn("fetchKey ({}) caught embedded parse exception ({})",
- t.getFetchKey().getFetchKey(), embeddedStack);
- }
- }
-
- return shouldEmit;
- }
-
-
- private void injectUserMetadata(Metadata userMetadata, Metadata metadata) {
- for (String n : userMetadata.names()) {
- metadata.set(n, null);
- for (String v : userMetadata.getValues(n)) {
- metadata.add(n, v);
- }
- }
- }
-
- private Map<String, String> returnError(String emitterName, Error error) {
- Map<String, String> statusMap = new HashMap<>();
- statusMap.put("status", "parse_error");
- statusMap.put("emitter", emitterName);
- String msg = ExceptionUtils.getStackTrace(error);
- statusMap.put("parse_error", msg);
- return statusMap;
- }
-
- private Map<String, String> emit(EmitKey emitKey, List<Metadata>
metadataList)
- throws TikaException {
- Emitter emitter = emitterManager.getEmitter(emitKey.getEmitterName());
- String status = "ok";
- String exceptionMsg = "";
- try {
- emitter.emit(emitKey.getEmitKey(), metadataList);
- } catch (IOException | TikaEmitterException e) {
- LOG.warn("problem emitting (" + emitKey.getEmitterName() + ")", e);
- status = "emitter_exception";
- exceptionMsg = ExceptionUtils.getStackTrace(e);
- }
- Map<String, String> statusMap = new HashMap<>();
- statusMap.put("status", status);
- statusMap.put("emitter", emitKey.getEmitterName());
- if (exceptionMsg.length() > 0) {
- statusMap.put("emitter_exception", exceptionMsg);
- }
- String parseStackTrace =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
- if (parseStackTrace != null) {
- statusMap.put("parse_exception", parseStackTrace);
- }
- return statusMap;
- }
-
-}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
new file mode 100644
index 0000000..daf4da3
--- /dev/null
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server.core.resource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.PipesParser;
+import org.apache.tika.pipes.PipesResult;
+
+@Path("/pipes")
+public class PipesResource {
+
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipesResource.class);
+
+ private final PipesParser pipesParser;
+ public PipesResource(java.nio.file.Path tikaConfig) throws
TikaConfigException, IOException {
+ PipesConfig pipesConfig = PipesConfig.load(tikaConfig);
+ this.pipesParser = new PipesParser(pipesConfig);
+ }
+
+
+ /**
+ * The client posts a json request. At a minimum, this must be a
+ * json object that contains an emitter and a fetcherString key with
+ * the key to fetch the inputStream. Optionally, it may contain a metadata
+ * object that will be used to populate the metadata key for pass
+ * through of metadata from the client. It may also include a handler
config.
+ * <p>
+ * The extracted text content is stored with the key
+ * {@link TikaCoreProperties#TIKA_CONTENT}
+ * <p>
+ * Must specify a fetcherString and an emitter in the posted json.
+ *
+ * @param info uri info
+ * @return InputStream that can be deserialized as a list of {@link
Metadata} objects
+ * @throws Exception
+ */
+ @POST
+ @Produces("application/json")
+ public Map<String, String> postRmeta(InputStream is, @Context HttpHeaders
httpHeaders,
+ @Context UriInfo info) throws
Exception {
+ FetchEmitTuple t = null;
+ try (Reader reader = new InputStreamReader(is,
StandardCharsets.UTF_8)) {
+ t = JsonFetchEmitTuple.fromJson(reader);
+ }
+ return processTuple(t);
+ }
+
+ private Map<String, String> processTuple(FetchEmitTuple fetchEmitTuple)
+ throws PipesException, IOException {
+
+ PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
+ switch (pipesResult.getStatus()) {
+ case CLIENT_UNAVAILABLE_WITHIN_MS:
+ throw new IllegalStateException("client not available within "
+
+ "allotted amount of time");
+ case EMIT_EXCEPTION:
+ return returnEmitException(pipesResult.getMessage());
+ case PARSE_SUCCESS:
+ throw new IllegalArgumentException("Should have emitted in
forked process?!");
+ case EMIT_SUCCESS:
+ return returnSuccess();
+ case EMIT_SUCCESS_PARSE_EXCEPTION:
+ return parseException(pipesResult.getMessage(), true);
+ case PARSE_EXCEPTION_EMIT:
+ throw new IllegalArgumentException("Should have tried to emit
in forked " +
+ "process?!");
+ case PARSE_EXCEPTION_NO_EMIT:
+ return parseException(pipesResult.getMessage(), false);
+ case TIMEOUT:
+ return returnError("timeout");
+ case OOM:
+ return returnError("oom");
+ case UNSPECIFIED_CRASH:
+ return returnError("unknown_crash");
+ case NO_EMITTER_FOUND: {
+ throw new IllegalArgumentException("Couldn't find emitter that
matched: " +
+ fetchEmitTuple.getEmitKey().getEmitterName());
+ }
+ default:
+ throw new IllegalArgumentException("I'm sorry, I don't yet
handle a status of " +
+ "this type: " + pipesResult.getStatus());
+ }
+ }
+
+ private Map<String, String> parseException(String msg, boolean emitted) {
+ Map<String, String> statusMap = new HashMap<>();
+ statusMap.put("status", "ok");
+ statusMap.put("parse_exception", msg);
+ statusMap.put("emitted", Boolean.toString(emitted));
+ return statusMap;
+ }
+
+ private Map<String, String> returnEmitException(String msg) {
+ Map<String, String> statusMap = new HashMap<>();
+ statusMap.put("status", "emit_exception");
+ statusMap.put("message", msg);
+ return statusMap;
+ }
+
+ private Map<String, String> returnSuccess() {
+ Map<String, String> statusMap = new HashMap<>();
+ statusMap.put("status", "ok");
+ return statusMap;
+ }
+
+ private Map<String, String> returnError(String type) {
+ Map<String, String> statusMap = new HashMap<>();
+ statusMap.put("status", "parse_error");
+ statusMap.put("parse_error", type);
+ return statusMap;
+ }
+
+ public void close() throws IOException {
+ pipesParser.close();
+ }
+}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
similarity index 66%
rename from
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
rename to
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index 69b57c5..58e37f5 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
@@ -46,6 +47,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
@@ -53,46 +55,50 @@ import
org.apache.tika.metadata.serialization.JsonMetadataList;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.server.core.resource.EmitterResource;
+import org.apache.tika.server.core.resource.PipesResource;
import org.apache.tika.server.core.writer.JSONObjWriter;
+import org.apache.tika.utils.ProcessUtils;
/**
* This offers basic integration tests with fetchers and emitters.
* We use file system fetchers and emitters.
*/
-public class TikaEmitterTest extends CXFTestBase {
+public class TikaPipesTest extends CXFTestBase {
- private static final String EMITTER_PATH = "/emit";
- private static final String EMITTER_PATH_AND_FS = "/emit/fse";
+ private static final String PIPES_PATH = "/pipes";
private static Path TMP_DIR;
private static Path TMP_OUTPUT_DIR;
private static Path TMP_OUTPUT_FILE;
+ private static Path TMP_NPE_OUTPUT_FILE;
+ private static Path TIKA_CONFIG_PATH;
private static String TIKA_CONFIG_XML;
private static FetcherManager FETCHER_MANAGER;
- private static EmitterManager EMITTER_MANAGER;
private static String HELLO_WORLD = "hello_world.xml";
private static String HELLO_WORLD_JSON = "hello_world.xml.json";
+ private static String NPE_JSON = "null_pointer.xml.json";
private static String[] VALUE_ARRAY = new String[]{"my-value-1",
"my-value-2", "my-value-3"};
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TMP_DIR = Files.createTempDirectory("tika-emitter-test-");
+ TMP_DIR = Files.createTempDirectory("tika-pipes-test-");
Path inputDir = TMP_DIR.resolve("input");
TMP_OUTPUT_DIR = TMP_DIR.resolve("output");
TMP_OUTPUT_FILE = TMP_OUTPUT_DIR.resolve(HELLO_WORLD_JSON);
+ TMP_NPE_OUTPUT_FILE = TMP_OUTPUT_DIR.resolve("null_pointer.xml.json");
+
Files.createDirectories(inputDir);
Files.createDirectories(TMP_OUTPUT_DIR);
for (String mockFile : new String[]{"hello_world.xml",
"null_pointer.xml"}) {
Files.copy(
-
TikaEmitterTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
+
TikaPipesTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
inputDir.resolve(mockFile));
}
+ TIKA_CONFIG_PATH = Files.createTempFile(TMP_DIR, "tika-pipes-",
".xml");
TIKA_CONFIG_XML =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>"
+ "<fetchers>" +
@@ -105,11 +111,12 @@ public class TikaEmitterTest extends CXFTestBase {
"<basePath>" +
TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
"</params>" +
"</emitter>" +
- "</emitters>" + "</properties>";
- Path tmp = Files.createTempFile("tika-emitter-", ".xml");
- Files.write(tmp, TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
- FETCHER_MANAGER = FetcherManager.load(tmp);
- EMITTER_MANAGER = EmitterManager.load(tmp);
+ "</emitters>" + "<pipes><params><tikaConfig>" +
+
ProcessUtils.escapeCommandLine(TIKA_CONFIG_PATH.toAbsolutePath().toString()) +
+
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+ "</arg></forkedJvmArgs>" +
+ "</params></pipes>" + "</properties>";
+ Files.write(TIKA_CONFIG_PATH,
TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
}
@AfterClass
@@ -122,13 +129,20 @@ public class TikaEmitterTest extends CXFTestBase {
if (Files.exists(TMP_OUTPUT_FILE)) {
Files.delete(TMP_OUTPUT_FILE);
}
+ if (Files.exists(TMP_NPE_OUTPUT_FILE)) {
+ Files.delete(TMP_NPE_OUTPUT_FILE);
+ }
assertFalse(Files.isRegularFile(TMP_OUTPUT_FILE));
}
@Override
protected void setUpResources(JAXRSServerFactoryBean sf) {
List<ResourceProvider> rCoreProviders = new
ArrayList<ResourceProvider>();
- rCoreProviders.add(new SingletonResourceProvider(new
EmitterResource(FETCHER_MANAGER, EMITTER_MANAGER)));
+ try {
+ rCoreProviders.add(new SingletonResourceProvider(new
PipesResource(TIKA_CONFIG_PATH)));
+ } catch (IOException | TikaConfigException e) {
+ throw new RuntimeException(e);
+ }
sf.setResourceProviders(rCoreProviders);
}
@@ -150,41 +164,6 @@ public class TikaEmitterTest extends CXFTestBase {
return new FetcherStreamFactory(FETCHER_MANAGER);
}
- @Test
- public void testGet() throws Exception {
-
- String q = "?fn=fsf&fk=hello_world.xml&type=text";
- String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
- Response response =
WebClient.create(getUrl).accept("application/json").get();
- assertEquals(200, response.getStatus());
- List<Metadata> metadataList = null;
- try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
- metadataList = JsonMetadataList.fromJson(reader);
- }
- assertEquals(1, metadataList.size());
- Metadata metadata = metadataList.get(0);
- assertEquals("hello world",
metadata.get(TikaCoreProperties.TIKA_CONTENT).trim());
- assertEquals("Nikolai Lobachevsky", metadata.get("author"));
- assertEquals("你好,世界", metadata.get("title"));
- assertEquals("application/mock+xml",
metadata.get(Metadata.CONTENT_TYPE));
- }
-
- @Test
- public void testGetXML() throws Exception {
-
- String q = "?fn=fsf&fk=hello_world.xml&type=xml";
- String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
- Response response =
WebClient.create(getUrl).accept("application/json").get();
- assertEquals(200, response.getStatus());
- List<Metadata> metadataList = null;
- try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
- metadataList = JsonMetadataList.fromJson(reader);
- }
- assertEquals(1, metadataList.size());
- Metadata metadata = metadataList.get(0);
- String xml = metadata.get(TikaCoreProperties.TIKA_CONTENT);
- assertContains("<p>hello world</p>", xml);
- }
@Test
public void testPost() throws Exception {
@@ -196,12 +175,13 @@ public class TikaEmitterTest extends CXFTestBase {
}
FetchEmitTuple t =
- new FetchEmitTuple(new FetchKey("fsf", "hello_world.xml"), new
EmitKey("fse", ""),
+ new FetchEmitTuple("myId",
+ new FetchKey("fsf", "hello_world.xml"), new
EmitKey("fse", ""),
userMetadata);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
- String getUrl = endPoint + EMITTER_PATH;
+ String getUrl = endPoint + PIPES_PATH;
Response response =
WebClient.create(getUrl).accept("application/json").post(writer.toString());
assertEquals(200, response.getStatus());
@@ -230,14 +210,16 @@ public class TikaEmitterTest extends CXFTestBase {
}
FetchEmitTuple t =
- new FetchEmitTuple(new FetchKey("fsf", "hello_world.xml"), new
EmitKey("fse", ""),
+ new FetchEmitTuple("myId",
+ new FetchKey("fsf", "hello_world.xml"),
+ new EmitKey("fse", ""),
userMetadata,
new
HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, -1, -1),
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
- String getUrl = endPoint + EMITTER_PATH;
+ String getUrl = endPoint + PIPES_PATH;
Response response =
WebClient.create(getUrl).accept("application/json").post(writer.toString());
assertEquals(200, response.getStatus());
@@ -252,47 +234,6 @@ public class TikaEmitterTest extends CXFTestBase {
}
@Test
- public void testPut() throws Exception {
-
- String getUrl = endPoint + EMITTER_PATH_AND_FS + "/text";
- String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
-
- Response response = WebClient.create(getUrl).accept("application/json")
- .header(metaPathKey, "hello_world.xml")
-
.put(ClassLoader.getSystemResourceAsStream("test-documents/mock/hello_world.xml"));
- assertEquals(200, response.getStatus());
- List<Metadata> metadataList = null;
- try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
- metadataList = JsonMetadataList.fromJson(reader);
- }
- assertEquals(1, metadataList.size());
- Metadata metadata = metadataList.get(0);
- assertEquals("hello world",
metadata.get(TikaCoreProperties.TIKA_CONTENT).trim());
- assertEquals("Nikolai Lobachevsky", metadata.get("author"));
- assertEquals("你好,世界", metadata.get("title"));
- assertEquals("application/mock+xml",
metadata.get(Metadata.CONTENT_TYPE));
- }
-
- @Test
- public void testPutXML() throws Exception {
-
- String putUrl = endPoint + EMITTER_PATH_AND_FS + "/xml";
- String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
-
- Response response = WebClient.create(putUrl).accept("application/json")
- .header(metaPathKey, "hello_world.xml")
-
.put(ClassLoader.getSystemResourceAsStream("test-documents/mock/hello_world.xml"));
- assertEquals(200, response.getStatus());
- List<Metadata> metadataList = null;
- try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
- metadataList = JsonMetadataList.fromJson(reader);
- }
- assertEquals(1, metadataList.size());
- Metadata metadata = metadataList.get(0);
- assertContains("<p>hello world</p>",
metadata.get(TikaCoreProperties.TIKA_CONTENT));
- }
-
- @Test
public void testPostNPE() throws Exception {
Metadata userMetadata = new Metadata();
userMetadata.set("my-key", "my-value");
@@ -301,12 +242,14 @@ public class TikaEmitterTest extends CXFTestBase {
}
FetchEmitTuple t =
- new FetchEmitTuple(new FetchKey("fsf", "null_pointer.xml"),
new EmitKey("fse", ""),
+ new FetchEmitTuple("myId",
+ new FetchKey("fsf", "null_pointer.xml"),
+ new EmitKey("fse", ""),
userMetadata);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
- String getUrl = endPoint + EMITTER_PATH;
+ String getUrl = endPoint + PIPES_PATH;
Response response =
WebClient.create(getUrl).accept("application/json").post(writer.toString());
assertEquals(200, response.getStatus());
@@ -320,7 +263,7 @@ public class TikaEmitterTest extends CXFTestBase {
String parseException = jsonResponse.get("parse_exception").asText();
assertNotNull(parseException);
assertContains("NullPointerException", parseException);
-
+ assertEquals(true, jsonResponse.get("emitted").asBoolean());
List<Metadata> metadataList = null;
try (Reader reader = Files
.newBufferedReader(TMP_OUTPUT_DIR.resolve("null_pointer.xml.json"))) {
@@ -335,5 +278,31 @@ public class TikaEmitterTest extends CXFTestBase {
metadata.get(TikaCoreProperties.CONTAINER_EXCEPTION));
}
- //can't test system_exit here because server is in same process
+ @Test
+ public void testPostNPENoEmit() throws Exception {
+ FetchEmitTuple t =
+ new FetchEmitTuple("myId",
+ new FetchKey("fsf", "null_pointer.xml"),
+ new EmitKey("fse", ""),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+ StringWriter writer = new StringWriter();
+ JsonFetchEmitTuple.toJson(t, writer);
+
+ String getUrl = endPoint + PIPES_PATH;
+ Response response =
+
WebClient.create(getUrl).accept("application/json").post(writer.toString());
+ assertEquals(200, response.getStatus());
+
+ JsonNode jsonResponse;
+ try (Reader reader = new InputStreamReader((InputStream)
response.getEntity(),
+ StandardCharsets.UTF_8)) {
+ jsonResponse = new ObjectMapper().readTree(reader);
+ }
+ ;
+ String parseException = jsonResponse.get("parse_exception").asText();
+ assertNotNull(parseException);
+ assertContains("NullPointerException", parseException);
+ assertEquals(false, jsonResponse.get("emitted").asBoolean());
+ assertFalse(Files.isRegularFile(TMP_NPE_OUTPUT_FILE));
+ }
}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index d5ee1a5..b86e1ee 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.JsonNode;
@@ -49,12 +50,13 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.utils.ProcessUtils;
@Ignore("useful for development...need to turn it into a real unit test")
public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
- private static final int NUM_FILES = 450;
+ private static final int NUM_FILES = 100;
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
@@ -66,8 +68,9 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
private static List<String> FILE_LIST = new ArrayList<>();
private static String[] FILES = new String[]{
"hello_world.xml",
- "null_pointer.xml"
- // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+ "null_pointer.xml",
+ // "heavy_hang_30000.xml", "real_oom.xml",
+ "system_exit.xml"
};
@BeforeClass
@@ -77,13 +80,18 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
TMP_OUTPUT_DIR = TMP_DIR.resolve("output");
Files.createDirectories(inputDir);
Files.createDirectories(TMP_OUTPUT_DIR);
-
+ Random rand = new Random();
for (int i = 0; i < NUM_FILES; i++) {
for (String mockFile : FILES) {
+ if (mockFile.equals("system_exit.xml")) {
+ if (rand.nextFloat() > 0.1) {
+ continue;
+ }
+ }
String targetName = i + "-" + mockFile;
Path target = inputDir.resolve(targetName);
FILE_LIST.add(targetName);
- Files.copy(TikaEmitterTest.class
+ Files.copy(TikaPipesTest.class
.getResourceAsStream("/test-documents/mock/" +
mockFile), target);
}
@@ -105,8 +113,13 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" +
"</params>" +
"</emitter>" +
"</emitters>" +
- "<server><params>" +
+
"<server><params><endpoints><endpoint>async</endpoint></endpoints>" +
"<enableUnsecureFeatures>true</enableUnsecureFeatures></params></server>" +
+ "<async><params><tikaConfig>" +
+
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString()) +
+
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+
"</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
+ "</params></async>" +
"</properties>";
FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
@@ -154,13 +167,13 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
fail("bad status: '" + status + "' -> " +
response.toPrettyString());
}
int expected = (ON_PARSE_EXCEPTION ==
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
- FILE_LIST.size() : FILE_LIST.size() / 2;
+ FILE_LIST.size() : FILE_LIST.size() / 3;
int targets = 0;
- while (targets < FILE_LIST.size()) {
+ while (targets < NUM_FILES * 2) {
targets = countTargets();
Thread.sleep(100);
}
- System.out.println("elapsed : " + (System.currentTimeMillis() -
start));
+// System.out.println("elapsed : " + (System.currentTimeMillis() -
start));
} finally {
serverThread.interrupt();
}
@@ -185,7 +198,7 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
}
private FetchEmitTuple getFetchEmitTuple(String fileName) throws
IOException {
- return new FetchEmitTuple(new FetchKey(FETCHER_NAME, fileName),
+ return new FetchEmitTuple(fileName, new FetchKey(FETCHER_NAME,
fileName),
new EmitKey(EMITTER_NAME, ""), new Metadata(),
HandlerConfig.DEFAULT_HANDLER_CONFIG,
ON_PARSE_EXCEPTION);
}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
similarity index 86%
rename from
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
rename to
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
index 2677bf1..5ff0f11 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
@@ -52,10 +52,10 @@ import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.utils.ProcessUtils;
-public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
+public class TikaServerPipesIntegrationTest extends IntegrationTestBase {
private static final Logger LOG =
- LoggerFactory.getLogger(TikaServerEmitterIntegrationTest.class);
+ LoggerFactory.getLogger(TikaServerPipesIntegrationTest.class);
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
private static Path TMP_DIR;
@@ -76,7 +76,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
for (String mockFile : FILES) {
Files.copy(
-
TikaEmitterTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
+
TikaPipesTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
inputDir.resolve(mockFile));
}
TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
@@ -92,9 +92,14 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
"<basePath>" + TMP_OUTPUT_DIR.toAbsolutePath() +
"</basePath>" + "</params>" + "</emitter>" + "</emitters>" +
"<server><params>" +
"<enableUnsecureFeatures>true</enableUnsecureFeatures>" +
"<port>9999</port>" +
- "<endpoints>" + "<endpoint>emit</endpoint>" +
"<endpoint>status</endpoint>" +
+ "<endpoints>" + "<endpoint>pipes</endpoint>" +
"<endpoint>status</endpoint>" +
"</endpoints>";
- String xml2 = "</params></server>" + "</properties>";
+ String xml2 = "</params></server>" +
+ "<pipes><params><tikaConfig>" +
+
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString()) +
+
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+ "</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
+ "</params></pipes>" + "</properties>";
String tikaConfigXML = xml1 + xml2;
@@ -138,6 +143,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
JsonNode node = testOne("hello_world.xml", true);
assertEquals("ok", node.get("status").asText());
} catch (Exception e) {
+ e.printStackTrace();
fail("shouldn't have an exception" + e.getMessage());
} finally {
if (p != null) {
@@ -185,13 +191,15 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
}
}
- @Test(expected = ProcessingException.class)
+ @Test
public void testSystemExit() throws Exception {
Process p = null;
try {
p = startProcess(new String[]{"-config",
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString())});
- testOne("system_exit.xml", false);
+ JsonNode node = testOne("system_exit.xml", false);
+ assertEquals("parse_error", node.get("status").asText());
+ assertContains("unknown_crash", node.get("parse_error").asText());
} finally {
if (p != null) {
p.destroyForcibly();
@@ -206,8 +214,9 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
try {
p = startProcess(new String[]{"-config",
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString())});
- JsonNode response = testOne("fake_oom.xml", false);
- assertContains("oom message",
response.get("parse_error").asText());
+ JsonNode node = testOne("fake_oom.xml", false);
+ assertEquals("parse_error", node.get("status").asText());
+ assertContains("oom", node.get("parse_error").asText());
} catch (ProcessingException e) {
//depending on timing, there may be a connection exception --
// TODO add more of a delay to server shutdown to ensure message
is sent
@@ -219,13 +228,15 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
}
}
- @Test(expected = ProcessingException.class)
+ @Test
public void testTimeout() throws Exception {
Process p = null;
try {
p = startProcess(new String[]{"-config",
ProcessUtils.escapeCommandLine(
TIKA_CONFIG_TIMEOUT.toAbsolutePath().toString())});
- JsonNode response = testOne("heavy_hang_30000.xml", false);
+ JsonNode node = testOne("heavy_hang_30000.xml", false);
+ assertEquals("parse_error", node.get("status").asText());
+ assertContains("timeout", node.get("parse_error").asText());
} finally {
if (p != null) {
p.destroyForcibly();
@@ -243,7 +254,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
awaitServerStartup();
Response response = WebClient
- .create(endPoint + "/emit")
+ .create(endPoint + "/pipes")
.accept("application/json")
.post(getJsonString(fileName, onParseException));
if (response.getStatus() == 200) {
@@ -262,7 +273,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
private String getJsonString(String fileName,
FetchEmitTuple.ON_PARSE_EXCEPTION
onParseException)
throws IOException {
- FetchEmitTuple t = new FetchEmitTuple(new FetchKey(FETCHER_NAME,
fileName),
+ FetchEmitTuple t = new FetchEmitTuple(fileName, new
FetchKey(FETCHER_NAME, fileName),
new EmitKey(EMITTER_NAME, ""), new Metadata(),
HandlerConfig.DEFAULT_HANDLER_CONFIG,
onParseException);
return JsonFetchEmitTuple.toJson(t);