This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new b370d4f  TIKA-3370 -- add pipes processor and swap out /emit for /pipes
b370d4f is described below

commit b370d4f4f09ff45a6ee4b619223c468236c77761
Author: tallison <[email protected]>
AuthorDate: Mon May 10 17:32:32 2021 -0400

    TIKA-3370 -- add pipes processor and swap out /emit for /pipes
---
 .../java/org/apache/tika/config/ConfigBase.java    |   2 +-
 .../java/org/apache/tika/pipes/FetchEmitTuple.java |  13 +-
 .../java/org/apache/tika/pipes/PipesClient.java    | 292 ++++++++++++++++++++
 .../java/org/apache/tika/pipes/PipesConfig.java    |  54 ++++
 .../org/apache/tika/pipes/PipesConfigBase.java     |  95 +++++++
 ...ncRuntimeException.java => PipesException.java} |   6 +-
 .../java/org/apache/tika/pipes/PipesParser.java    |  75 +++++
 .../java/org/apache/tika/pipes/PipesResult.java    |  72 +++++
 .../{async/AsyncServer.java => PipesServer.java}   | 291 ++++++++++++++------
 .../org/apache/tika/pipes/async/AsyncClient.java   | 158 -----------
 .../apache/tika/pipes/async/AsyncClientConfig.java |   3 -
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  85 +++++-
 .../org/apache/tika/pipes/async/AsyncEmitter.java  |   4 +-
 .../tika/pipes/async/AsyncEmitterConfig.java       |  27 --
 .../apache/tika/pipes/async/AsyncProcessor.java    |  43 +--
 .../org/apache/tika/pipes/async/AsyncResult.java   |  53 ----
 .../apache/tika/pipes/emitter/AbstractEmitter.java |  15 -
 .../org/apache/tika/pipes/emitter/EmitData.java    |  16 ++
 .../tika/pipes/fetchiterator/FetchIterator.java    |   2 +-
 .../fetchiterator/FileSystemFetchIterator.java     |   2 +-
 .../tika/pipes/async/AsyncProcessorTest.java       |  11 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  40 ++-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  30 +-
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |   3 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |   4 +-
 .../apache/tika/server/core/TikaServerProcess.java | 124 +++++----
 .../tika/server/core/resource/AsyncResource.java   |   1 -
 .../tika/server/core/resource/EmitterResource.java | 305 ---------------------
 .../tika/server/core/resource/PipesResource.java   | 155 +++++++++++
 .../{TikaEmitterTest.java => TikaPipesTest.java}   | 165 +++++------
 .../core/TikaServerAsyncIntegrationTest.java       |  33 ++-
 ...st.java => TikaServerPipesIntegrationTest.java} |  37 ++-
 32 files changed, 1340 insertions(+), 876 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java 
b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index c8e560e..f8b6791 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -43,7 +43,7 @@ import org.apache.tika.utils.XMLReaderUtils;
 public abstract class ConfigBase {
 
     /**
-     * Use this to build a list of componentes for a composite item (e.g.
+     * Use this to build a list of components for a composite item (e.g.
      * CompositeMetadataFilter, FetcherManager), each with their own 
configurations
      *
      * @param compositeElementName
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java 
b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3e4f773..092c323 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -37,19 +37,20 @@ public class FetchEmitTuple implements Serializable {
     private final ON_PARSE_EXCEPTION onParseException;
     private HandlerConfig handlerConfig;
 
-    public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata 
metadata) {
-        this(fetchKey, emitKey, metadata, HandlerConfig.DEFAULT_HANDLER_CONFIG,
+
+    public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
+        this(id, fetchKey, emitKey, new Metadata(), 
HandlerConfig.DEFAULT_HANDLER_CONFIG,
                 DEFAULT_ON_PARSE_EXCEPTION);
     }
+    public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, 
ON_PARSE_EXCEPTION onParseException) {
+        this(id, fetchKey, emitKey, new Metadata(), 
HandlerConfig.DEFAULT_HANDLER_CONFIG,
+                onParseException);
+    }
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, 
Metadata metadata) {
         this(id, fetchKey, emitKey, metadata, 
HandlerConfig.DEFAULT_HANDLER_CONFIG,
                 DEFAULT_ON_PARSE_EXCEPTION);
     }
-    public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata 
metadata,
-                          HandlerConfig handlerConfig, ON_PARSE_EXCEPTION 
onParseException) {
-        this(fetchKey.getFetchKey(), fetchKey, emitKey, metadata, 
handlerConfig, onParseException);
-    }
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, 
Metadata metadata,
                           HandlerConfig handlerConfig, ON_PARSE_EXCEPTION 
onParseException) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
new file mode 100644
index 0000000..45a3565
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.async.AsyncConfig;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.utils.ProcessUtils;
+
+public class PipesClient implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipesClient.class);
+
+    private Process process;
+    private LogGobbler logGobbler;
+    private Thread logGobblerThread;
+    private final PipesConfigBase pipesConfig;
+    private DataOutputStream output;
+    private DataInputStream input;
+
+    public PipesClient(PipesConfigBase pipesConfig) {
+        this.pipesConfig = pipesConfig;
+    }
+
+    private int filesProcessed = 0;
+
+    public int getFilesProcessed() {
+        return filesProcessed;
+    }
+
+    private boolean ping() {
+        if (process == null || ! process.isAlive()) {
+            return false;
+        }
+        try {
+            output.write(PipesServer.PING);
+            output.flush();
+            int ping = input.read();
+            if (ping == PipesServer.PING) {
+                return true;
+            }
+        } catch (IOException e) {
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (process != null) {
+            process.destroyForcibly();
+        }
+    }
+
+    public PipesResult process(FetchEmitTuple t) throws IOException {
+        if (! ping()) {
+            restart();
+        }
+
+        if (pipesConfig.getMaxFilesProcessed() > 0 &&
+                filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
+            LOG.info("restarting server after hitting max files: " + 
filesProcessed);
+            restart();
+        }
+        //TODO consider adding a timer here too
+        // this could block forever if the watchdog thread in the server fails
+        // or is starved
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
+            objectOutputStream.writeObject(t);
+        }
+        byte[] bytes = bos.toByteArray();
+        output.write(PipesServer.CALL);
+        output.writeInt(bytes.length);
+        output.write(bytes);
+        output.flush();
+
+        long start = System.currentTimeMillis();
+        try {
+            return readResults(t);
+        } catch (IOException e) {
+
+            try {
+                process.waitFor(200, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException interruptedException) {
+                //wait just a little bit to let process end to get exit value
+            } finally {
+                process.destroyForcibly();
+            }
+            if (! process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == 
process.exitValue()) {
+                LOG.warn("{} timed out", t.getId());
+                return PipesResult.TIMEOUT;
+            }
+            return PipesResult.UNSPECIFIED_CRASH;
+        }
+    }
+
+    private PipesResult readResults(FetchEmitTuple t) throws IOException {
+        int status = input.read();
+        switch (status) {
+            case PipesServer.OOM:
+                LOG.warn("oom: " + t.getId());
+                return PipesResult.OOM;
+            case PipesServer.TIMEOUT:
+                LOG.warn("timeout: " + t.getId());
+                return PipesResult.TIMEOUT;
+            case PipesServer.EMIT_EXCEPTION:
+                LOG.warn("emit exception: " + t.getId());
+                return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
+            case PipesServer.NO_EMITTER_FOUND:
+                LOG.warn("no emitter found: " + t.getId());
+                return PipesResult.NO_EMITTER_FOUND;
+            case PipesServer.PARSE_SUCCESS:
+            case PipesServer.PARSE_EXCEPTION_EMIT:
+                LOG.info("parse success: " + t.getId());
+                return deserializeEmitData();
+            case PipesServer.PARSE_EXCEPTION_NO_EMIT:
+                return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
+            case PipesServer.EMIT_SUCCESS:
+                LOG.info("emit success: " + t.getId());
+                return PipesResult.EMIT_SUCCESS;
+            case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
+                return 
readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
+            default :
+                throw new IOException("problem reading response from server " 
+ status);
+        }
+
+    }
+
+    private PipesResult readMessage(PipesResult.STATUS status) throws 
IOException {
+        int length = input.readInt();
+        byte[] bytes = new byte[length];
+        input.readFully(bytes);
+        String msg = new String(bytes, StandardCharsets.UTF_8);
+        return new PipesResult(status, msg);
+    }
+
+    private PipesResult deserializeEmitData() throws IOException {
+        int length = input.readInt();
+        byte[] bytes = new byte[length];
+        input.readFully(bytes);
+        try (ObjectInputStream objectInputStream =
+                     new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+            return new PipesResult((EmitData)objectInputStream.readObject());
+        } catch (ClassNotFoundException e) {
+            LOG.error("class not found exception deserializing data", e);
+            //this should be catastrophic
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private void restart() throws IOException {
+        if (process != null) {
+            process.destroyForcibly();
+            LOG.info("restarting process");
+        } else {
+            LOG.info("starting process");
+        }
+        if (logGobblerThread != null) {
+            logGobblerThread.interrupt();
+        }
+        ProcessBuilder pb = new ProcessBuilder(getCommandline());
+
+        process = pb.start();
+        logGobbler = new LogGobbler(process.getErrorStream());
+        logGobblerThread = new Thread(logGobbler);
+        logGobblerThread.setDaemon(true);
+        logGobblerThread.start();
+        input = new DataInputStream(process.getInputStream());
+        output = new DataOutputStream(process.getOutputStream());
+    }
+
+    private String[] getCommandline() {
+        List<String> configArgs = pipesConfig.getForkedJvmArgs();
+        boolean hasClassPath = false;
+        boolean hasHeadless = false;
+        boolean hasExitOnOOM = false;
+
+        for (String arg : configArgs) {
+            if (arg.startsWith("-Djava.awt.headless")) {
+                hasHeadless = true;
+            }
+            if (arg.equals("-cp") || arg.equals("--classpath")) {
+                hasClassPath = true;
+            }
+            if (arg.equals("-XX:+ExitOnOutOfMemoryError") ||
+                    arg.equals("-XX:+CrashOnOutOfMemoryError")) {
+                hasExitOnOOM = true;
+            }
+        }
+
+        List<String> commandLine = new ArrayList<>();
+        String javaPath = pipesConfig.getJavaPath();
+        commandLine.add(ProcessUtils.escapeCommandLine(javaPath));
+        if (! hasClassPath) {
+            commandLine.add("-cp");
+            commandLine.add(System.getProperty("java.class.path"));
+        }
+        if (! hasHeadless) {
+            commandLine.add("-Djava.awt.headless=true");
+        }
+        if (! hasExitOnOOM) {
+            //warn
+        }
+        for (String arg : configArgs) {
+            commandLine.add(arg);
+        }
+        commandLine.add("org.apache.tika.pipes.PipesServer");
+        commandLine.add(
+                
ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
+
+        //turn off emit batching
+        String maxForEmitBatchBytes = "0";
+        if (pipesConfig instanceof AsyncConfig) {
+            maxForEmitBatchBytes =
+                    
Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
+        }
+        commandLine.add(maxForEmitBatchBytes);
+        commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
+        
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
+
+        return commandLine.toArray(new String[0]);
+    }
+
+    public static class LogGobbler implements Runnable {
+        private static final Logger SERVER_LOG = 
LoggerFactory.getLogger(LogGobbler.class);
+
+        private final BufferedReader reader;
+        public LogGobbler(InputStream is) {
+            reader = new BufferedReader(new InputStreamReader(is, 
StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public void run() {
+            String line = null;
+            try {
+                line = reader.readLine();
+            } catch (IOException e) {
+                return;
+            }
+            while (line != null) {
+                if (line.startsWith("debug ")) {
+                    SERVER_LOG.debug(line.substring(6));
+                } else if (line.startsWith("info ")) {
+                    SERVER_LOG.info(line.substring(5));
+                } else if (line.startsWith("warn ")) {
+                    SERVER_LOG.warn(line.substring(5));
+                } else if (line.startsWith("error ")) {
+                    SERVER_LOG.error(line.substring(6));
+                }
+                try {
+                    line = reader.readLine();
+                } catch (IOException e) {
+                    return;
+                }
+            }
+        }
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
new file mode 100644
index 0000000..94ae135
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Set;
+
+import org.apache.tika.exception.TikaConfigException;
+
+public class PipesConfig extends PipesConfigBase {
+
+    private long maxWaitForClientMillis = 60000;
+
+    public static PipesConfig load(Path tikaConfig) throws IOException, 
TikaConfigException {
+        PipesConfig pipesConfig = new PipesConfig();
+        try (InputStream is = Files.newInputStream(tikaConfig)) {
+            Set<String> settings = pipesConfig.configure("pipes", is);
+        }
+        if (pipesConfig.getTikaConfig() == null) {
+            throw new TikaConfigException("must specify at least a 
<tikaConfig> element in the " +
+                    "<params> of <pipes>");
+        }
+        return pipesConfig;
+    }
+
+    private PipesConfig() {
+
+    }
+
+    public long getMaxWaitForClientMillis() {
+        return maxWaitForClientMillis;
+    }
+
+    public void setMaxWaitForClientMillis(long maxWaitForClientMillis) {
+        this.maxWaitForClientMillis = maxWaitForClientMillis;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
new file mode 100644
index 0000000..8da4f9d
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.config.ConfigBase;
+
+public class PipesConfigBase extends ConfigBase {
+
+    private long timeoutMillis = 30000;
+    private long shutdownClientAfterMillis = 1200000;
+    private int numClients = 10;
+    private List<String> forkedJvmArgs = new ArrayList<>();
+    private int maxFilesProcessed = 10000;
+    private Path tikaConfig;
+    private String javaPath = "java";
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+    public void setTimeoutMillis(long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    public long getShutdownClientAfterMillis() {
+        return shutdownClientAfterMillis;
+    }
+
+    public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
+        this.shutdownClientAfterMillis = shutdownClientAfterMillis;
+    }
+
+    public int getNumClients() {
+        return numClients;
+    }
+
+    public void setNumClients(int numClients) {
+        this.numClients = numClients;
+    }
+
+    public List<String> getForkedJvmArgs() {
+        return forkedJvmArgs;
+    }
+
+    public void setForkedJvmArgs(List<String> jvmArgs) {
+        this.forkedJvmArgs = jvmArgs;
+    }
+
+    public int getMaxFilesProcessed() {
+        return maxFilesProcessed;
+    }
+
+    public void setMaxFilesProcessed(int maxFilesProcessed) {
+        this.maxFilesProcessed = maxFilesProcessed;
+    }
+
+    public Path getTikaConfig() {
+        return tikaConfig;
+    }
+
+    public void setTikaConfig(Path tikaConfig) {
+        this.tikaConfig = tikaConfig;
+    }
+
+    public void setTikaConfig(String tikaConfig) {
+        setTikaConfig(Paths.get(tikaConfig));
+    }
+
+    public String getJavaPath() {
+        return javaPath;
+    }
+
+    public void setJavaPath(String javaPath) {
+        this.javaPath = javaPath;
+    }
+}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
 b/tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
similarity index 86%
rename from 
tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
index 875cc90..ee9545f 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesException.java
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.async;
+package org.apache.tika.pipes;
 
 /**
  * Fatal exception that means that something went seriously wrong.
  */
-public class AsyncRuntimeException extends RuntimeException {
+public class PipesException extends Exception {
 
-    public AsyncRuntimeException(Throwable t) {
+    public PipesException(Throwable t) {
         super(t);
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
new file mode 100644
index 0000000..59bf633
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class PipesParser implements Closeable {
+
+
+    private final PipesConfig pipesConfig;
+    private final List<PipesClient> clients = new ArrayList<>();
+    private final ArrayBlockingQueue<PipesClient> clientQueue ;
+
+
+    public PipesParser(PipesConfig pipesConfig) {
+        this.pipesConfig = pipesConfig;
+        this.clientQueue = new 
ArrayBlockingQueue<>(pipesConfig.getNumClients());
+        for (int i = 0; i < pipesConfig.getNumClients(); i++) {
+            PipesClient client = new PipesClient(pipesConfig);
+            clientQueue.offer(client);
+            clients.add(client);
+        }
+    }
+    public PipesResult parse(FetchEmitTuple t) throws PipesException, 
IOException {
+        PipesClient client = null;
+        try {
+            client = clientQueue.poll(pipesConfig.getMaxWaitForClientMillis(),
+                    TimeUnit.MILLISECONDS);
+            if (client == null) {
+                return PipesResult.CLIENT_UNAVAILABLE_WITHIN_MS;
+            }
+            return client.process(t);
+        } catch (InterruptedException e) {
+            throw new PipesException(e);
+        } finally {
+            if (client != null) {
+                clientQueue.offer(client);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        List<IOException> exceptions = new ArrayList<>();
+        for (PipesClient pipesClient : clients) {
+            try {
+                pipesClient.close();
+            } catch (IOException e) {
+                exceptions.add(e);
+            }
+        }
+        if (exceptions.size() > 0) {
+            throw exceptions.get(0);
+        }
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
new file mode 100644
index 0000000..0805b03
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import org.apache.tika.pipes.emitter.EmitData;
+
+public class PipesResult {
+
+    public enum STATUS {
+        CLIENT_UNAVAILABLE_WITHIN_MS,
+        PARSE_EXCEPTION_NO_EMIT,
+        PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
+        OOM, TIMEOUT, UNSPECIFIED_CRASH,
+        NO_EMITTER_FOUND,
+        EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION
+    }
+
+    public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
+            new PipesResult(STATUS.CLIENT_UNAVAILABLE_WITHIN_MS);
+    public static PipesResult TIMEOUT = new PipesResult(STATUS.TIMEOUT);
+    public static PipesResult OOM = new PipesResult(STATUS.OOM);
+    public static PipesResult UNSPECIFIED_CRASH = new 
PipesResult(STATUS.UNSPECIFIED_CRASH);
+    public static PipesResult EMIT_SUCCESS = new 
PipesResult(STATUS.EMIT_SUCCESS);
+    public static PipesResult NO_EMITTER_FOUND = new 
PipesResult(STATUS.NO_EMITTER_FOUND);
+    private final STATUS status;
+    private final EmitData emitData;
+    private final String message;
+
+    private PipesResult(STATUS status, EmitData emitData, String message) {
+        this.status = status;
+        this.emitData = emitData;
+        this.message = message;
+    }
+
+    public PipesResult(STATUS status) {
+        this(status, null, null);
+    }
+
+    public PipesResult(STATUS status, String message) {
+        this(status, null, message);
+    }
+
+    public PipesResult(EmitData emitData) {
+        this(STATUS.PARSE_SUCCESS, emitData, null);
+    }
+
+    public STATUS getStatus() {
+        return status;
+    }
+
+    public EmitData getEmitData() {
+        return emitData;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
similarity index 57%
rename from tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
rename to tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index e6950dc..1749837 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.async;
+package org.apache.tika.pipes;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
@@ -35,61 +36,86 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.AutoDetectParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.StringUtils;
 
-public class AsyncServer implements Runnable {
+public class PipesServer implements Runnable {
 
-    public static final byte ERROR = -1;
-
-    public static final byte DONE = 0;
+    public static final int TIMEOUT_EXIT_CODE = 3;
 
     public static final byte CALL = 1;
 
     public static final byte PING = 2;
 
-    public static final byte RESOURCE = 3;
+    public static final byte FAILED_TO_START = 3;
+
+    public static final byte PARSE_SUCCESS = 4;
+
+    /**
+     * This will return the parse exception stack trace
+     */
+    public static final byte PARSE_EXCEPTION_NO_EMIT = 5;
+
+    /**
+     * This will return the metadata list
+     */
+    public static final byte PARSE_EXCEPTION_EMIT = 6;
+
+    public static final byte EMIT_SUCCESS = 7;
+
+    public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 8;
+
+    public static final byte EMIT_EXCEPTION = 9;
 
-    public static final byte READY = 4;
+    public static final byte NO_EMITTER_FOUND = 10;
 
-    public static final byte FAILED_TO_START = 5;
+    public static final byte OOM = 11;
 
-    public static final byte OOM = 6;
+    public static final byte TIMEOUT = 12;
+
+    public static final byte EMPTY_OUTPUT = 13;
 
-    public static final byte TIMEOUT = 7;
 
     private final Object[] lock = new Object[0];
     private final Path tikaConfigPath;
     private final DataInputStream input;
     private final DataOutputStream output;
+    private final long maxExtractSizeToReturn;
     private final long serverParseTimeoutMillis;
     private final long serverWaitTimeoutMillis;
     private Parser parser;
     private TikaConfig tikaConfig;
     private FetcherManager fetcherManager;
+    private EmitterManager emitterManager;
     private volatile boolean parsing;
     private volatile long since;
 
-    //logging is fussy...the logging frameworks grab stderr/stdout
+    //logging is fussy...the logging frameworks grab stderr and stdout
     //before we can redirect.  slf4j complains on stderr, log4j2 unconfigured 
writes to stdout
     //We can add logging later but it has to be done carefully...
-    public AsyncServer(Path tikaConfigPath, InputStream in, PrintStream out,
+    public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
+                       long maxExtractSizeToReturn,
                        long serverParseTimeoutMillis, long 
serverWaitTimeoutMillis)
             throws IOException, TikaException, SAXException {
         this.tikaConfigPath = tikaConfigPath;
         this.input = new DataInputStream(in);
         this.output = new DataOutputStream(out);
+        this.maxExtractSizeToReturn = maxExtractSizeToReturn;
         this.serverParseTimeoutMillis = serverParseTimeoutMillis;
         this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
         this.parsing = false;
@@ -99,11 +125,13 @@ public class AsyncServer implements Runnable {
 
     public static void main(String[] args) throws Exception {
         Path tikaConfig = Paths.get(args[0]);
-        long serverParseTimeoutMillis = Long.parseLong(args[1]);
-        long serverWaitTimeoutMillis = Long.parseLong(args[2]);
+        long maxForEmitBatchBytes = Long.parseLong(args[1]);
+        long serverParseTimeoutMillis = Long.parseLong(args[2]);
+        long serverWaitTimeoutMillis = Long.parseLong(args[3]);
 
-        AsyncServer server =
-                new AsyncServer(tikaConfig, System.in, System.out, 
serverParseTimeoutMillis,
+        PipesServer server =
+                new PipesServer(tikaConfig, System.in, System.out,
+                        maxForEmitBatchBytes, serverParseTimeoutMillis,
                 serverWaitTimeoutMillis);
         System.setIn(new ByteArrayInputStream(new byte[0]));
         System.setOut(System.err);
@@ -121,44 +149,47 @@ public class AsyncServer implements Runnable {
                 synchronized (lock) {
                     long elapsed = System.currentTimeMillis() - since;
                     if (parsing && elapsed > serverParseTimeoutMillis) {
-                        err("server timeout");
-                        System.exit(1);
+                        System.exit(TIMEOUT_EXIT_CODE);
                     } else if (!parsing && serverWaitTimeoutMillis > 0 &&
                             elapsed > serverWaitTimeoutMillis) {
-                        err("closing down from inactivity");
+                        debug("closing down from inactivity");
                         System.exit(0);
                     }
                 }
-                Thread.sleep(250);
+                Thread.sleep(100);
             }
         } catch (InterruptedException e) {
             //swallow
         }
     }
 
-    private void err(String msg) {
-        System.err.println(msg);
+    private void debug(String msg) {
+        System.err.println("debug " + msg.replaceAll("[\r\n]", " "));
+        System.err.flush();
+    }
+
+    private void warn(Throwable t) {
+        System.err.println("warn " + 
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
         System.err.flush();
     }
 
     private void err(Throwable t) {
-        t.printStackTrace();
+        System.err.println("err " + 
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
         System.err.flush();
     }
 
+
     public void processRequests() {
         //initialize
         try {
             initializeParser();
         } catch (Throwable t) {
-            t.printStackTrace();
-            System.err.flush();
+            err(t);
             try {
                 output.writeByte(FAILED_TO_START);
                 output.flush();
             } catch (IOException e) {
-                e.printStackTrace();
-                System.err.flush();
+                warn(e);
             }
             return;
         }
@@ -172,8 +203,7 @@ public class AsyncServer implements Runnable {
                     output.writeByte(PING);
                     output.flush();
                 } else if (request == CALL) {
-                    EmitData emitData = parseOne();
-                    write(emitData);
+                    parseOne();
                 } else {
                     throw new IllegalStateException("Unexpected request");
                 }
@@ -181,75 +211,120 @@ public class AsyncServer implements Runnable {
             }
         } catch (Throwable t) {
             t.printStackTrace();
+            err(t);
+            System.err.println("exiting");
+            exit(1);
         }
         System.err.flush();
     }
 
-    private void write(EmitData emitData) {
+    private boolean metadataIsEmpty(List<Metadata> metadataList) {
+        return metadataList == null || metadataList.size() == 0;
+    }
+
+    /**
+     * returns stack trace if there was a container exception or empty string
+     * if there was no stacktrace
+     * @param t
+     * @param metadataList
+     * @return
+     */
+    private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> 
metadataList) {
+        if (metadataList == null || metadataList.size() < 1) {
+            return "";
+        }
+        String stack = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+        return (stack != null) ? stack : "";
+    }
+
+
+    private void emit(EmitData emitData, String parseExceptionStack) {
+        Emitter emitter = 
emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+        if (emitter == null) {
+            write(NO_EMITTER_FOUND, new byte[0]);
+            return;
+        }
         try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-                objectOutputStream.writeObject(emitData);
-            }
-            byte[] bytes = bos.toByteArray();
-            int len = bytes.length;
-            output.write(READY);
-            output.writeInt(len);
-            output.write(bytes);
-            output.flush();
-        } catch (IOException e) {
-            err(e);
-            //LOG.error("problem writing emit data", e);
-            exit(1);
+            emitter.emit(emitData.getEmitKey().getEmitKey(), 
emitData.getMetadataList());
+        } catch (IOException | TikaEmitterException e) {
+            String msg = ExceptionUtils.getStackTrace(e);
+            byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+            //for now, we're hiding the parse exception if there was also an 
emit exception
+            write(EMIT_EXCEPTION, bytes);
+            return;
+        }
+        if (StringUtils.isBlank(parseExceptionStack)) {
+            write(EMIT_SUCCESS);
+        } else {
+            write(EMIT_SUCCESS_PARSE_EXCEPTION, 
parseExceptionStack.getBytes(StandardCharsets.UTF_8));
         }
     }
 
-    private EmitData parseOne() throws FetchException {
+
+    private void parseOne() throws FetchException {
         synchronized (lock) {
             parsing = true;
             since = System.currentTimeMillis();
         }
         try {
-            FetchEmitTuple t = readFetchEmitTuple();
-            Metadata userMetadata = t.getMetadata();
-            Metadata metadata = new Metadata();
-            String fetcherName = t.getFetchKey().getFetcherName();
-            String fetchKey = t.getFetchKey().getFetchKey();
-            List<Metadata> metadataList = null;
-            Fetcher fetcher = null;
-            try {
-                fetcher = fetcherManager.getFetcher(fetcherName);
-            } catch (TikaException | IOException e) {
-                err(e);
-                //LOG.error("can't get fetcher", e);
-                throw new FetchException(e);
+            actuallyParse();
+        } finally {
+            synchronized (lock) {
+                parsing = false;
+                since = System.currentTimeMillis();
             }
+        }
+    }
 
-            try (InputStream stream = fetcher.fetch(fetchKey, metadata)) {
-                metadataList = parseMetadata(t, stream, metadata);
-            } catch (SecurityException e) {
-                throw e;
-            } catch (TikaException | IOException e) {
-                err(e);
-                //LOG.error("problem reading from fetcher", e);
-                throw new FetchException(e);
-            } catch (OutOfMemoryError e) {
-                //LOG.error("oom", e);
-                handleOOM(e);
-            }
+    public void actuallyParse() throws FetchException {
+        FetchEmitTuple t = readFetchEmitTuple();
+        List<Metadata> metadataList = null;
 
-            injectUserMetadata(userMetadata, metadataList);
+        Fetcher fetcher = getFetcher(t.getFetchKey().getFetcherName());
+
+        Metadata metadata = new Metadata();
+        try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), 
metadata)) {
+            metadataList = parseMetadata(t, stream, metadata);
+        } catch (SecurityException e) {
+            throw e;
+        } catch (TikaException | IOException e) {
+            warn(e);
+            throw new FetchException(e);
+        } catch (OutOfMemoryError e) {
+            handleOOM(e);
+        }
+        if (metadataIsEmpty(metadataList)) {
+            write(EMPTY_OUTPUT);
+            return;
+        }
+
+        String stack = getContainerStacktrace(t, metadataList);
+        if (StringUtils.isBlank(stack) || t.getOnParseException() == 
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
+            injectUserMetadata(t.getMetadata(), metadataList);
             EmitKey emitKey = t.getEmitKey();
             if (StringUtils.isBlank(emitKey.getEmitKey())) {
-                emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+                emitKey = new EmitKey(emitKey.getEmitterName(), 
t.getFetchKey().getFetchKey());
                 t.setEmitKey(emitKey);
             }
-            return new EmitData(t.getEmitKey(), metadataList);
-        } finally {
-            synchronized (lock) {
-                parsing = false;
-                since = System.currentTimeMillis();
+            EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
+            if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
+                emit(emitData, stack);
+            } else {
+                write(emitData, stack);
             }
+        } else {
+            write(PARSE_EXCEPTION_NO_EMIT, 
stack.getBytes(StandardCharsets.UTF_8));
+        }
+
+    }
+
+    private Fetcher getFetcher(String fetcherName) throws FetchException {
+        try {
+            return fetcherManager.getFetcher(fetcherName);
+        } catch (TikaException | IOException e) {
+            warn(e);
+            //LOG.error("can't get fetcher", e);
+            throw new FetchException(e);
         }
     }
 
@@ -266,31 +341,28 @@ public class AsyncServer implements Runnable {
 
     private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple, 
InputStream stream,
                                          Metadata metadata) {
-        //make these configurable
-        BasicContentHandlerFactory.HANDLER_TYPE type = 
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
-
-
+        HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
         RecursiveParserWrapperHandler handler = new 
RecursiveParserWrapperHandler(
-                new BasicContentHandlerFactory(type,
-                    fetchEmitTuple.getHandlerConfig().getWriteLimit()),
-                fetchEmitTuple.getHandlerConfig().getMaxEmbeddedResources(),
+                new BasicContentHandlerFactory(handlerConfig.getType(),
+                    handlerConfig.getWriteLimit()),
+                handlerConfig.getMaxEmbeddedResources(),
                 tikaConfig.getMetadataFilter());
         ParseContext parseContext = new ParseContext();
         FetchKey fetchKey = fetchEmitTuple.getFetchKey();
         try {
             parser.parse(stream, handler, metadata, parseContext);
         } catch (SAXException e) {
-            err(e);
+            warn(e);
             //LOG.warn("problem:" + fetchKey.getFetchKey(), e);
         } catch (EncryptedDocumentException e) {
-            err(e);
+            warn(e);
             //LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
         } catch (SecurityException e) {
-            err(e);
+            warn(e);
             //LOG.warn("security exception: " + fetchKey.getFetchKey());
             throw e;
         } catch (Exception e) {
-            err(e);
+            warn(e);
             //LOG.warn("exception: " + fetchKey.getFetchKey());
         } catch (OutOfMemoryError e) {
             //TODO, maybe return file type gathered so far and then crash?
@@ -312,6 +384,8 @@ public class AsyncServer implements Runnable {
 
 
     private void exit(int exitCode) {
+        System.err.println("exiting: " + exitCode);
+        System.err.flush();
         System.exit(exitCode);
     }
 
@@ -342,6 +416,7 @@ public class AsyncServer implements Runnable {
         //TODO allowed named configurations in tika config
         this.tikaConfig = new TikaConfig(tikaConfigPath);
         this.fetcherManager = FetcherManager.load(tikaConfigPath);
+        this.emitterManager = EmitterManager.load(tikaConfigPath);
         Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
         this.parser = new RecursiveParserWrapper(autoDetectParser);
 
@@ -352,4 +427,42 @@ public class AsyncServer implements Runnable {
             super(t);
         }
     }
+
+    private void write(EmitData emitData, String stack) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(emitData);
+            }
+            write(PARSE_SUCCESS, bos.toByteArray());
+        } catch (IOException e) {
+            err(e);
+            //LOG.error("problem writing emit data", e);
+            exit(1);
+        }
+    }
+
+    private void write(byte status, byte[] bytes) {
+        try {
+            int len = bytes.length;
+            output.write(status);
+            output.writeInt(len);
+            output.write(bytes);
+            output.flush();
+        } catch (IOException e) {
+            err(e);
+            exit(1);
+        }
+    }
+
+    private void write(byte status) {
+        try {
+            output.write(status);
+            output.flush();
+        } catch (IOException e) {
+            err(e);
+            exit(1);
+        }
+    }
+
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
deleted file mode 100644
index b01eb93..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.file.Path;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.utils.ProcessUtils;
-
-public class AsyncClient implements Closeable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncProcessor.class);
-
-    //TODO: make these configurable
-    private long parseTimeoutMillis = 30000;
-    private long waitTimeoutMillis = 500000;
-
-    private Process process;
-    private final Path tikaConfigPath;
-    private DataOutputStream output;
-    private DataInputStream input;
-
-    public AsyncClient(Path tikaConfigPath) {
-        this.tikaConfigPath = tikaConfigPath;
-    }
-
-    private int filesProcessed = 0;
-
-    public int getFilesProcessed() {
-        return filesProcessed;
-    }
-
-    private boolean ping() {
-        if (process == null || ! process.isAlive()) {
-            return false;
-        }
-        try {
-            output.write(AsyncServer.PING);
-            output.flush();
-            int ping = input.read();
-            if (ping == AsyncServer.PING) {
-                return true;
-            }
-        } catch (IOException e) {
-            return false;
-        }
-        return false;
-    }
-
-    @Override
-    public void close() {
-        process.destroyForcibly();
-    }
-
-    public AsyncResult process(FetchEmitTuple t) throws IOException {
-        if (! ping()) {
-            restart();
-        }
-        //TODO consider adding a timer here too
-        // this could block forever if the watchdog thread in the server fails
-        // or is starved
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-            objectOutputStream.writeObject(t);
-        }
-        byte[] bytes = bos.toByteArray();
-        output.write(AsyncServer.CALL);
-        output.writeInt(bytes.length);
-        output.write(bytes);
-        output.flush();
-
-        long start = System.currentTimeMillis();
-        try {
-            return readResults(t);
-        } catch (IOException e) {
-            long elapsed = System.currentTimeMillis() - start;
-            if (elapsed > parseTimeoutMillis) {
-                LOG.warn("{} timed out", t.getId());
-                return AsyncResult.TIMEOUT;
-            }
-            return AsyncResult.UNSPECIFIED_CRASH;
-        }
-    }
-
-    private AsyncResult readResults(FetchEmitTuple t) throws IOException {
-        int status = input.read();
-        //TODO clean this up, never return null
-        if (status == AsyncServer.OOM) {
-            LOG.warn(t.getId() + " oom");
-            return AsyncResult.OOM;
-        } else if (status == AsyncServer.READY) {
-        } else {
-            throw new IOException("problem reading response from server " + 
status);
-        }
-        int length = input.readInt();
-        byte[] bytes = new byte[length];
-        input.readFully(bytes);
-        try (ObjectInputStream objectInputStream =
-                     new ObjectInputStream(new ByteArrayInputStream(bytes))) {
-            return new AsyncResult((EmitData)objectInputStream.readObject());
-        } catch (ClassNotFoundException e) {
-            //this should be catastrophic
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private void restart() throws IOException {
-        if (process != null) {
-            process.destroyForcibly();
-        }
-        LOG.debug("restarting process");
-        ProcessBuilder pb = new ProcessBuilder(getCommandline());
-        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
-        process = pb.start();
-        input = new DataInputStream(process.getInputStream());
-        output = new DataOutputStream(process.getOutputStream());
-    }
-
-    private String[] getCommandline() {
-        //TODO: make this all configurable
-        return new String[]{
-                "java",
-                "-cp",
-                System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncServer",
-                
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()),
-                Long.toString(parseTimeoutMillis),
-                Long.toString(waitTimeoutMillis),
-        };
-    }
-}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
index c8b8ccb..e12b7c1 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
@@ -22,9 +22,6 @@ import java.nio.file.Path;
 class AsyncClientConfig {
 
     private int fetchQueueSize = 20000;
-    private int numWorkers = 10;
-    private String[] workerJVMArgs;
-    private long parseTimeoutMs;
     private long waitTimeoutMs;
     private long maxFilesProcessed;
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index d16dd16..e939e85 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -17,20 +17,93 @@
 package org.apache.tika.pipes.async;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 
-class AsyncConfig {
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.PipesConfigBase;
 
+public class AsyncConfig extends PipesConfigBase {
 
-    private AsyncClientConfig asyncClientConfig;
-    private AsyncClientConfig asyncRetryClientConfig;
-    private AsyncEmitterConfig asyncEmitterConfig;
+    private long emitWithinMillis = 10000;
+    private long emitMaxEstimatedBytes = 100000;
 
-    public static AsyncConfig load(Path p) throws IOException {
-        AsyncConfig asyncConfig = new AsyncConfig();
+    private long maxForEmitBatchBytes = 0;
+    private int queueSize = 10000;
+    private final int numEmitters = 1;
 
+    public static AsyncConfig load(Path p) throws IOException, 
TikaConfigException {
+        AsyncConfig asyncConfig = new AsyncConfig();
+        try (InputStream is = Files.newInputStream(p)) {
+            asyncConfig.configure("async", is);
+        }
         return asyncConfig;
     }
 
+    public long getEmitWithinMillis() {
+        return emitWithinMillis;
+    }
+
+    /**
+     * If nothing has been emitted in this amount of time
+     * and the {@link #getEmitMaxEstimatedBytes()} has not been reached yet,
+     * emit what's in the emit queue.
+     *
+     * @param emitWithinMillis
+     */
+    public void setEmitWithinMillis(long emitWithinMillis) {
+        this.emitWithinMillis = emitWithinMillis;
+    }
+
+    /**
+     * When the emit queue hits this estimated size (sum of
+     * estimated extract sizes), emit the batch.
+     * @return
+     */
+    public long getEmitMaxEstimatedBytes() {
+        return emitMaxEstimatedBytes;
+    }
+
+    public void setEmitMaxEstimatedBytes(long emitMaxEstimatedBytes) {
+        this.emitMaxEstimatedBytes = emitMaxEstimatedBytes;
+    }
+
 
+    /**
+     *  What is the maximum bytes size per extract that
+     *  will be allowed in the emit queue.  If an extract is too
+     *  big, skip the emit queue and forward it directly from the processor.  
If
+     *  set to <code>0</code>, this will never send a an extract back for 
batch emitting,
+     *  but will emit the extract directly from the processor.
+     * @return
+     */
+    public long getMaxForEmitBatchBytes() {
+        return maxForEmitBatchBytes;
+    }
+
+    public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
+        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+    }
+
+    /**
+     * FetchEmitTuple queue size
+     * @return
+     */
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(int queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    /**
+     * Number of emitters
+     *
+     * @return
+     */
+    public int getNumEmitters() {
+        return numEmitters;
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index fa53764..9d40b47 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.EmitterManager;
@@ -106,8 +105,7 @@ public class AsyncEmitter implements Callable<Integer> {
 
         void add(EmitData data) {
             size++;
-            long sz = AbstractEmitter
-                    .estimateSizeInBytes(data.getEmitKey().getEmitKey(), 
data.getMetadataList());
+            long sz = data.getEstimatedSizeBytes();
             if (estimatedSize + sz > maxBytes) {
                 LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
                         (estimatedSize + sz), maxBytes);
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
deleted file mode 100644
index ed6973c..0000000
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-class AsyncEmitterConfig {
-
-    private long emitWithinMs;
-    private long emitMaxEstimatedBytes;
-    private final int numEmitters = 1;
-
-
-
-}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index cd08d4c..0d66074 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -33,6 +33,9 @@ import org.xml.sax.SAXException;
 
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
@@ -45,12 +48,11 @@ import org.apache.tika.pipes.fetchiterator.FetchIterator;
 public class AsyncProcessor implements Closeable {
 
     static final int PARSER_FUTURE_CODE = 1;
-    private final Path tikaConfigPath;
     private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
     private final ArrayBlockingQueue<EmitData> emitData;
     private final ExecutorCompletionService<Integer> executorCompletionService;
     private final ExecutorService executorService;
-    private final int fetchEmitTupleQSize = 1000;
+    private final int fetchEmitTupleQSize = 10000;
     private int numParserThreads = 10;
     private int numEmitterThreads = 2;
     private int numParserThreadsFinished = 0;
@@ -59,15 +61,15 @@ public class AsyncProcessor implements Closeable {
     boolean isShuttingDown = false;
 
     public AsyncProcessor(Path tikaConfigPath) throws TikaException, 
IOException, SAXException {
-        this.tikaConfigPath = tikaConfigPath;
         this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
         this.emitData = new ArrayBlockingQueue<>(100);
         this.executorService = Executors.newFixedThreadPool(numParserThreads + 
numEmitterThreads);
         this.executorCompletionService =
                 new ExecutorCompletionService<>(executorService);
 
+        AsyncConfig asyncConfig = AsyncConfig.load(tikaConfigPath);
         for (int i = 0; i < numParserThreads; i++) {
-            executorCompletionService.submit(new 
FetchEmitWorker(tikaConfigPath, fetchEmitTuples,
+            executorCompletionService.submit(new FetchEmitWorker(asyncConfig, 
fetchEmitTuples,
                     emitData));
         }
 
@@ -78,7 +80,7 @@ public class AsyncProcessor implements Closeable {
     }
 
     public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, 
long offerMs)
-            throws AsyncRuntimeException, InterruptedException {
+            throws PipesException, InterruptedException {
         if (isShuttingDown) {
             throw new IllegalStateException(
                     "Can't call offer after calling close() or " + 
"shutdownNow()");
@@ -111,7 +113,7 @@ public class AsyncProcessor implements Closeable {
     }
 
     public synchronized boolean offer(FetchEmitTuple t, long offerMs)
-            throws AsyncRuntimeException, InterruptedException {
+            throws PipesException, InterruptedException {
         if (fetchEmitTuples == null) {
             throw new IllegalStateException("queue hasn't been initialized 
yet.");
         } else if (isShuttingDown) {
@@ -153,21 +155,21 @@ public class AsyncProcessor implements Closeable {
 
     private class FetchEmitWorker implements Callable<Integer> {
 
-        private final Path tikaConfigPath;
+        private final AsyncConfig asyncConfig;
         private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
         private final ArrayBlockingQueue<EmitData> emitDataQueue;
 
-        private FetchEmitWorker(Path tikaConfigPath,
+        private FetchEmitWorker(AsyncConfig asyncConfig,
                                 ArrayBlockingQueue<FetchEmitTuple> 
fetchEmitTuples,
                                 ArrayBlockingQueue<EmitData> emitDataQueue) {
-            this.tikaConfigPath = tikaConfigPath;
+            this.asyncConfig = asyncConfig;
             this.fetchEmitTuples = fetchEmitTuples;
             this.emitDataQueue = emitDataQueue;
         }
         @Override
         public Integer call() throws Exception {
 
-            try (AsyncClient asyncClient = new AsyncClient(tikaConfigPath)) {
+            try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
                 while (true) {
                     FetchEmitTuple t = fetchEmitTuples.poll(1, 
TimeUnit.SECONDS);
                     if (t == null) {
@@ -175,15 +177,24 @@ public class AsyncProcessor implements Closeable {
                     } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                         return PARSER_FUTURE_CODE;
                     } else {
-                        AsyncResult result = null;
+                        PipesResult result = null;
                         try {
-                            result = asyncClient.process(t);
+                            result = pipesClient.process(t);
                         } catch (IOException e) {
-                            result = AsyncResult.UNSPECIFIED_CRASH;
+                            result = PipesResult.UNSPECIFIED_CRASH;
                         }
-                        if (result.getStatus() == AsyncResult.STATUS.OK) {
-                            //TODO -- add timeout, this currently hangs forever
-                            emitDataQueue.offer(result.getEmitData());
+                        switch (result.getStatus()) {
+                            case PARSE_SUCCESS:
+                                //TODO -- add timeout, this currently hangs 
forever
+                                emitDataQueue.offer(result.getEmitData());
+                                break;
+                            case EMIT_SUCCESS:
+                                break;
+                            case EMIT_EXCEPTION:
+                            case UNSPECIFIED_CRASH:
+                            case OOM:
+                            case TIMEOUT:
+                                break;
                         }
                     }
                     checkActive();
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java
deleted file mode 100644
index 115e344..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncResult.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import org.apache.tika.pipes.emitter.EmitData;
-
-public class AsyncResult {
-
-    enum STATUS {
-        OK, OOM, TIMEOUT, UNSPECIFIED_CRASH
-    }
-    public static AsyncResult TIMEOUT = new AsyncResult(STATUS.TIMEOUT);
-    public static AsyncResult OOM = new AsyncResult(STATUS.OOM);
-    public static AsyncResult UNSPECIFIED_CRASH = new 
AsyncResult(STATUS.UNSPECIFIED_CRASH);
-
-    private final STATUS status;
-    private final EmitData emitData;
-
-    private AsyncResult(STATUS status, EmitData emitData) {
-        this.status = status;
-        this.emitData = emitData;
-    }
-
-    public AsyncResult(STATUS status) {
-        this(status, null);
-    }
-
-    public AsyncResult(EmitData emitData) {
-        this(STATUS.OK, emitData);
-    }
-
-    public STATUS getStatus() {
-        return status;
-    }
-
-    public EmitData getEmitData() {
-        return emitData;
-    }
-}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index f7520ad..648e094 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -19,25 +19,10 @@ package org.apache.tika.pipes.emitter;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.tika.metadata.Metadata;
-
 public abstract class AbstractEmitter implements Emitter {
 
     private String name;
 
-    public static long estimateSizeInBytes(String id, List<Metadata> 
metadataList) {
-        long sz = 36 + id.length() * 2;
-        for (Metadata m : metadataList) {
-            for (String n : m.names()) {
-                sz += 36 + n.length() * 2;
-                for (String v : m.getValues(n)) {
-                    sz += 36 + v.length() * 2;
-                }
-            }
-        }
-        return sz;
-    }
-
     @Override
     public String getName() {
         return name;
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
index 797de58..c744140 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -43,6 +43,22 @@ public class EmitData implements Serializable {
         return metadataList;
     }
 
+    public long getEstimatedSizeBytes() {
+        return estimateSizeInBytes(getEmitKey().getEmitKey(), 
getMetadataList());
+    }
+
+    private static long estimateSizeInBytes(String id, List<Metadata> 
metadataList) {
+        long sz = 36 + id.length() * 2;
+        for (Metadata m : metadataList) {
+            for (String n : m.names()) {
+                sz += 36 + n.length() * 2;
+                for (String v : m.getValues(n)) {
+                    sz += 36 + v.length() * 2;
+                }
+            }
+        }
+        return sz;
+    }
     @Override
     public String toString() {
         return "EmitData{" + "emitKey=" + emitKey + ", metadataList=" + 
metadataList + '}';
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index f493ffb..490a77b 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -53,7 +53,7 @@ public abstract class FetchIterator
     public static final int DEFAULT_QUEUE_SIZE = 1000;
 
     public static final FetchEmitTuple COMPLETED_SEMAPHORE =
-            new FetchEmitTuple(null, null, null, null);
+            new FetchEmitTuple(null,null, null, null, null, null);
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FetchIterator.class);
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 2f8224e..aef095d 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -102,7 +102,7 @@ public class FileSystemFetchIterator extends FetchIterator 
implements Initializa
             String relPath = basePath.relativize(file).toString();
 
             try {
-                tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, relPath),
+                tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherName, 
relPath),
                         new EmitKey(emitterName, relPath), new Metadata()));
             } catch (TimeoutException e) {
                 throw new IOException(e);
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 79bf911..6a82c6f 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -65,7 +65,13 @@ public class AsyncProcessorTest {
                 "    <fetcher 
class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
                 "      <params><name>mock</name>\n" + "      <basePath>" +
                 
ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
-                "</basePath></params>\n" + "    </fetcher>" + "  </fetchers>" 
+ "</properties>";
+                "</basePath></params>\n" + "    </fetcher>" + "  </fetchers>" +
+                        "<async><params><tikaConfig>" +
+                        
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
+                        "</tikaConfig><forkedJvmArgs><arg>-Xmx256m</arg" +
+                        
"></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
+                        "</params></async>" +
+                        "</properties>";
         Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
         Random r = new Random();
         for (int i = 0; i < totalFiles; i++) {
@@ -89,7 +95,8 @@ public class AsyncProcessorTest {
     public void testBasic() throws Exception {
         AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
         for (int i = 0; i < totalFiles; i++) {
-            FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", i + 
".xml"),
+            FetchEmitTuple t = new FetchEmitTuple("myId",
+                    new FetchKey("mock", i + ".xml"),
                     new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 6f65140..7799b05 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -53,6 +53,13 @@ import org.apache.tika.utils.StringUtils;
  * to the metadata object.
  * <p>
  *  <ul>
+ *      <li>If an 'idColumn' is specified, this will use that
+ *      column's value as the id.</li>
+ *      <li>If no 'idColumn' is specified, but a 'fetchKeyColumn' is specified,
+ *          the string in the 'fetchKeyColumn' will be used as the 'id'.</li>
+ *      <li>The 'idColumn' value is not added to the metadata.</li>
+ *  </ul>
+ *  <ul>
  *      <li>If a 'fetchKeyColumn' is specified, this will use that
  *      column's value as the fetchKey.</li>
  *      <li>If no 'fetchKeyColumn' is specified, this will send the
@@ -76,7 +83,7 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
     private Path csvPath;
     private String fetchKeyColumn;
     private String emitKeyColumn;
-
+    private String idColumn;
 
     @Field
     public void setCsvPath(String csvPath) {
@@ -94,6 +101,11 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
     }
 
     @Field
+    public void setIdColumn(String idColumn) {
+        this.idColumn = idColumn;
+    }
+
+    @Field
     public void setCsvPath(Path csvPath) {
         this.csvPath = csvPath;
     }
@@ -114,6 +126,7 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
             checkFetchEmitValidity(fetcherName, emitterName, 
fetchEmitKeyIndices, headers);
             HandlerConfig handlerConfig = getHandlerConfig();
             for (CSVRecord record : records) {
+                String id = getId(fetchEmitKeyIndices, record);
                 String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
                 String emitKey = getEmitKey(fetchEmitKeyIndices, record);
                 if (StringUtils.isBlank(fetchKey) && 
!StringUtils.isBlank(fetcherName)) {
@@ -123,8 +136,11 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
                 if (StringUtils.isBlank(emitKey)) {
                     throw new IOException("emitKey must not be blank in :" + 
record);
                 }
+                if (StringUtils.isBlank(id) && ! 
StringUtils.isBlank(fetchKey)) {
+                    id = fetchKey;
+                }
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, 
record);
-                tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, 
fetchKey),
+                tryToAdd(new FetchEmitTuple(id, new FetchKey(fetcherName, 
fetchKey),
                         new EmitKey(emitterName, emitKey), metadata, 
handlerConfig,
                         getOnParseException()));
             }
@@ -169,6 +185,14 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
         }
     }
 
+    private String getId(FetchEmitKeyIndices fetchEmitKeyIndices, CSVRecord 
record) {
+        if (fetchEmitKeyIndices.idIndex > -1) {
+            return record.get(fetchEmitKeyIndices.idIndex);
+        }
+        return StringUtils.EMPTY;
+    }
+
+
     private String getFetchKey(FetchEmitKeyIndices fetchEmitKeyIndices, 
CSVRecord record) {
         if (fetchEmitKeyIndices.fetchKeyIndex > -1) {
             return record.get(fetchEmitKeyIndices.fetchKeyIndex);
@@ -200,7 +224,7 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
             throws IOException {
         int fetchKeyColumnIndex = -1;
         int emitKeyColumnIndex = -1;
-
+        int idIndex = -1;
         for (int col = 0; col < record.size(); col++) {
             String header = record.get(col);
             if (StringUtils.isBlank(header)) {
@@ -212,9 +236,11 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
                 fetchKeyColumnIndex = col;
             } else if (header.equals(emitKeyColumn)) {
                 emitKeyColumnIndex = col;
+            } else if (header.equals(idColumn)) {
+                idIndex = col;
             }
         }
-        return new FetchEmitKeyIndices(fetchKeyColumnIndex, 
emitKeyColumnIndex);
+        return new FetchEmitKeyIndices(idIndex, fetchKeyColumnIndex, 
emitKeyColumnIndex);
     }
 
     @Override
@@ -225,16 +251,18 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
     }
 
     private static class FetchEmitKeyIndices {
+        private final int idIndex;
         private final int fetchKeyIndex;
         private final int emitKeyIndex;
 
-        public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+        public FetchEmitKeyIndices(int idIndex, int fetchKeyIndex, int 
emitKeyIndex) {
+            this.idIndex = idIndex;
             this.fetchKeyIndex = fetchKeyIndex;
             this.emitKeyIndex = emitKeyIndex;
         }
 
         public boolean shouldSkip(int index) {
-            return fetchKeyIndex == index || emitKeyIndex == index;
+            return idIndex == index || fetchKeyIndex == index || emitKeyIndex 
== index;
         }
     }
 }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index a12cd06..ac8f2a2 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -69,6 +69,7 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCFetchIterator.class);
 
+    private String idColumn;
     private String fetchKeyColumn;
     private String emitKeyColumn;
     private String connection;
@@ -77,6 +78,11 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
     private Connection db;
 
     @Field
+    public void setIdColumn(String idColumn) {
+        this.idColumn = idColumn;
+    }
+
+    @Field
     public void setFetchKeyColumn(String fetchKeyColumn) {
         this.fetchKeyColumn = fetchKeyColumn;
     }
@@ -163,6 +169,7 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
         Metadata metadata = new Metadata();
         String fetchKey = "";
         String emitKey = "";
+        String id = "";
         for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
             if (i == fetchEmitKeyIndices.fetchKeyIndex) {
                 fetchKey = getString(i, rs);
@@ -180,13 +187,21 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
                 emitKey = (emitKey == null) ? "" : emitKey;
                 continue;
             }
+            if (i == fetchEmitKeyIndices.idIndex) {
+                id = getString(i, rs);
+                if (id == null) {
+                    LOGGER.warn("id is empty for record " + toString(rs));
+                }
+                id = (id == null) ? "" : id;
+                continue;
+            }
             String val = getString(i, rs);
             if (val != null) {
                 metadata.set(headers.get(i - 1), val);
             }
         }
 
-        tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+        tryToAdd(new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey),
                 new EmitKey(emitterName, emitKey), metadata, handlerConfig, 
getOnParseException()));
     }
 
@@ -213,6 +228,7 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
 
     private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, 
List<String> headers)
             throws SQLException {
+        int idIndex = -1;
         int fetchKeyIndex = -1;
         int emitKeyIndex = -1;
         for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -222,9 +238,13 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
             if (metaData.getColumnLabel(i).equalsIgnoreCase(emitKeyColumn)) {
                 emitKeyIndex = i;
             }
+            if (metaData.getColumnLabel(i).equalsIgnoreCase(idColumn)) {
+                idIndex = i;
+            }
+
             headers.add(metaData.getColumnLabel(i));
         }
-        return new FetchEmitKeyIndices(fetchKeyIndex, emitKeyIndex);
+        return new FetchEmitKeyIndices(idIndex, fetchKeyIndex, emitKeyIndex);
     }
 
     @Override
@@ -257,16 +277,18 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
     }
 
     private static class FetchEmitKeyIndices {
+        private final int idIndex;
         private final int fetchKeyIndex;
         private final int emitKeyIndex;
 
-        public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+        public FetchEmitKeyIndices(int idIndex, int fetchKeyIndex, int 
emitKeyIndex) {
+            this.idIndex = idIndex;
             this.fetchKeyIndex = fetchKeyIndex;
             this.emitKeyIndex = emitKeyIndex;
         }
 
         public boolean shouldSkip(int index) {
-            return fetchKeyIndex == index || emitKeyIndex == index;
+            return idIndex == index || fetchKeyIndex == index || emitKeyIndex 
== index;
         }
     }
 }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index bfe918a..9de5010 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -132,7 +132,7 @@ public class TestJDBCFetchIterator {
                     fail("failed to find key pattern: " + k);
                 }
                 String aOrB = Integer.parseInt(num) % 2 == 0 ? "a" : "b";
-                assertEquals("id" + num, p.getMetadata().get("MY_ID"));
+                assertEquals("id" + num, p.getId());
                 assertEquals("project" + aOrB, 
p.getMetadata().get("MY_PROJECT"));
                 assertNull(p.getMetadata().get("fetchKey"));
                 assertNull(p.getMetadata().get("MY_FETCHKEY"));
@@ -151,6 +151,7 @@ public class TestJDBCFetchIterator {
                 "                <fetcherName>s3f</fetcherName>\n" +
                 "                <emitterName>s3e</emitterName>\n" +
                 "                <queueSize>57</queueSize>\n" +
+                "                <idColumn>my_id</idColumn>\n" +
                 "                
<fetchKeyColumn>my_fetchkey</fetchKeyColumn>\n" +
                 "                <emitKeyColumn>my_fetchkey</emitKeyColumn>\n" 
+
                 "                <select>" +
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index c8f8663..11a7dd0 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -132,7 +132,9 @@ public class S3FetchIterator extends FetchIterator 
implements Initializable {
 
             long elapsed = System.currentTimeMillis() - start;
             LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), 
elapsed);
-            tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, 
summary.getKey()),
+            //TODO -- allow user specified metadata as the "id"?
+            tryToAdd(new FetchEmitTuple(summary.getKey(), new 
FetchKey(fetcherName,
+                    summary.getKey()),
                     new EmitKey(emitterName, summary.getKey()), new 
Metadata(), handlerConfig,
                     getOnParseException()));
             count++;
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 870b5e9..342a097 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -45,11 +45,12 @@ import 
org.apache.cxf.transport.common.gzip.GZIPInInterceptor;
 import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
 
 import org.apache.tika.Tika;
 import org.apache.tika.config.ServiceLoader;
 import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
@@ -57,9 +58,9 @@ import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
-import org.apache.tika.server.core.resource.EmitterResource;
 import org.apache.tika.server.core.resource.LanguageResource;
 import org.apache.tika.server.core.resource.MetadataResource;
+import org.apache.tika.server.core.resource.PipesResource;
 import org.apache.tika.server.core.resource.RecursiveMetadataResource;
 import org.apache.tika.server.core.resource.TikaDetectors;
 import org.apache.tika.server.core.resource.TikaMimeTypes;
@@ -231,7 +232,7 @@ public class TikaServerProcess {
 
         List<ResourceProvider> resourceProviders = new ArrayList<>();
         List<Object> providers = new ArrayList<>();
-        loadAllProviders(tikaServerConfig, asyncResource, fetcherManager,
+        loadAllProviders(tikaServerConfig,
                 serverStatus,
                 resourceProviders,
                 providers);
@@ -258,14 +259,12 @@ public class TikaServerProcess {
     }
 
     private static void loadAllProviders(TikaServerConfig tikaServerConfig,
-                                         AsyncResource asyncResource,
-                                         FetcherManager fetcherManager,
                                          ServerStatus serverStatus,
                                          List<ResourceProvider> 
resourceProviders,
                                          List<Object> writers)
-            throws TikaConfigException, IOException {
+            throws TikaException, SAXException, IOException {
         List<ResourceProvider> tmpCoreProviders =
-                loadCoreProviders(tikaServerConfig, asyncResource, 
fetcherManager, serverStatus);
+                loadCoreProviders(tikaServerConfig, serverStatus);
 
         resourceProviders.addAll(tmpCoreProviders);
         resourceProviders.add(new SingletonResourceProvider(new 
TikaWelcome(tmpCoreProviders)));
@@ -309,11 +308,11 @@ public class TikaServerProcess {
     }
 
     private static List<ResourceProvider> loadCoreProviders(TikaServerConfig 
tikaServerConfig,
-                                                            AsyncResource 
asyncResource,
-                                                            FetcherManager 
fetcherManager,
                                                             ServerStatus 
serverStatus)
-            throws TikaConfigException, IOException {
+            throws TikaException, IOException, SAXException {
         List<ResourceProvider> resourceProviders = new ArrayList<>();
+        boolean addAsyncResource = false;
+        boolean addPipesResource = false;
         if (tikaServerConfig.getEndpoints().size() == 0) {
             resourceProviders.add(new SingletonResourceProvider(new 
MetadataResource()));
             resourceProviders.add(new SingletonResourceProvider(new 
RecursiveMetadataResource()));
@@ -329,54 +328,73 @@ public class TikaServerProcess {
             resourceProviders.add(new SingletonResourceProvider(new 
TikaParsers()));
             resourceProviders.add(new SingletonResourceProvider(new 
TikaVersion()));
             if (tikaServerConfig.isEnableUnsecureFeatures()) {
-                resourceProviders.add(new SingletonResourceProvider(new 
EmitterResource(
-                        fetcherManager, 
EmitterManager.load(tikaServerConfig.getConfigPath())
-                )));
-                resourceProviders.add(new 
SingletonResourceProvider(asyncResource));
+                addAsyncResource = true;
+                addPipesResource = true;
                 resourceProviders
                         .add(new SingletonResourceProvider(new 
TikaServerStatus(serverStatus)));
             }
-            resourceProviders.addAll(loadResourceServices());
-            return resourceProviders;
-        }
-        for (String endPoint : tikaServerConfig.getEndpoints()) {
-            if ("meta".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
MetadataResource()));
-            } else if ("rmeta".equals(endPoint)) {
-                resourceProviders
-                        .add(new SingletonResourceProvider(new 
RecursiveMetadataResource()));
-            } else if ("detect".equals(endPoint)) {
-                resourceProviders
-                        .add(new SingletonResourceProvider(new 
DetectorResource(serverStatus)));
-            } else if ("language".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
LanguageResource()));
-            } else if ("translate".equals(endPoint)) {
-                resourceProviders
-                        .add(new SingletonResourceProvider(new 
TranslateResource(serverStatus)));
-            } else if ("tika".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
TikaResource()));
-            } else if ("unpack".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
UnpackerResource()));
-            } else if ("mime".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
TikaMimeTypes()));
-            } else if ("detectors".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
TikaDetectors()));
-            } else if ("parsers".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
TikaParsers()));
-            } else if ("version".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(new 
TikaVersion()));
-            } else if ("emit".equals(endPoint)) {
-                resourceProviders.add(new SingletonResourceProvider(
-                        new EmitterResource(fetcherManager,
-                                
EmitterManager.load(tikaServerConfig.getConfigPath()))));
-            } else if ("async".equals(endPoint)) {
-                resourceProviders.add(new 
SingletonResourceProvider(asyncResource));
-            } else if ("status".equals(endPoint)) {
-                resourceProviders
-                        .add(new SingletonResourceProvider(new 
TikaServerStatus(serverStatus)));
+        } else {
+            for (String endPoint : tikaServerConfig.getEndpoints()) {
+                if ("meta".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
MetadataResource()));
+                } else if ("rmeta".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
RecursiveMetadataResource()));
+                } else if ("detect".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
DetectorResource(serverStatus)));
+                } else if ("language".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
LanguageResource()));
+                } else if ("translate".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TranslateResource(serverStatus)));
+                } else if ("tika".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaResource()));
+                } else if ("unpack".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
UnpackerResource()));
+                } else if ("mime".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaMimeTypes()));
+                } else if ("detectors".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaDetectors()));
+                } else if ("parsers".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaParsers()));
+                } else if ("version".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaVersion()));
+                } else if ("pipes".equals(endPoint)) {
+                    addPipesResource = true;
+                } else if ("async".equals(endPoint)) {
+                    addAsyncResource = true;
+                } else if ("status".equals(endPoint)) {
+                    resourceProviders.add(new SingletonResourceProvider(new 
TikaServerStatus(serverStatus)));
+                }
             }
-            resourceProviders.addAll(loadResourceServices());
         }
+
+        if (addAsyncResource) {
+            final AsyncResource localAsyncResource = new 
AsyncResource(tikaServerConfig.getConfigPath());
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                    public void run() {
+                        try {
+                            localAsyncResource.shutdownNow();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            resourceProviders.add(new 
SingletonResourceProvider(localAsyncResource));
+        }
+        if (addPipesResource) {
+            final PipesResource localPipesResource =
+                    new PipesResource(tikaServerConfig.getConfigPath());
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        localPipesResource.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            resourceProviders.add(new 
SingletonResourceProvider(localPipesResource));
+        }
+        resourceProviders.addAll(loadResourceServices());
         return resourceProviders;
     }
 
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 1c7fe1d..1ee5600 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -55,7 +55,6 @@ public class AsyncResource {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncResource.class);
 
-    private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
     long maxQueuePauseMs = 60000;
     private final AsyncProcessor asyncProcessor;
     private final FetcherManager fetcherManager;
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
deleted file mode 100644
index 425279c..0000000
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tika.server.core.resource;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.UriInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.HandlerConfig;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
-
-@Path("/emit")
-public class EmitterResource {
-
-    /**
-     * key that is safe to pass through http header.
-     * The user _must_ specify this for the fsemitter if calling 'put'
-     */
-    public static final String EMIT_KEY_FOR_HTTP_HEADER = "emit-key";
-    private static final String EMITTER_PARAM = "emitter";
-    private static final String HANDLER_PARAM = "type";
-    private static final String FETCHER_NAME_ABBREV = "fn";
-    private static final String FETCH_KEY_ABBREV = "fk";
-    private static final String EMIT_KEY_ABBREV = "ek";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(EmitterResource.class);
-
-    private final FetcherManager fetcherManager;
-    private final EmitterManager emitterManager;
-
-    public EmitterResource(FetcherManager fetcherManager, EmitterManager 
emitterManager) {
-        this.fetcherManager = fetcherManager;
-        this.emitterManager = emitterManager;
-    }
-
-    static EmitKey calcEmitKey(FetchEmitTuple t) {
-        //use fetch key if emitter key is not specified
-        //TODO: clean this up?
-        EmitKey emitKey = t.getEmitKey();
-        if (StringUtils.isBlank(emitKey.getEmitKey())) {
-            emitKey = new EmitKey(emitKey.getEmitterName(), 
t.getFetchKey().getFetchKey());
-        }
-        return emitKey;
-    }
-
-    /**
-     * @param is              input stream is ignored in 'get'
-     * @param httpHeaders
-     * @param info
-     * @param emitterName
-     * @param fetcherName     specify the fetcherName in the url's query 
section
-     * @param fetchKey        specify the fetch key in the url's query section
-     * @param handlerTypeName text, html, xml, body, ignore; default is text
-     * @return
-     * @throws Exception
-     */
-    @GET
-    @Produces("application/json")
-    @Path("{" + EMITTER_PARAM + " : (\\w+)?}")
-    public Map<String, String> getRmeta(InputStream is, @Context HttpHeaders 
httpHeaders,
-                                        @Context UriInfo info,
-                                        @PathParam(EMITTER_PARAM) String 
emitterName,
-                                        @QueryParam(FETCHER_NAME_ABBREV) 
String fetcherName,
-                                        @QueryParam(FETCH_KEY_ABBREV) String 
fetchKey,
-                                        @QueryParam(EMIT_KEY_ABBREV) String 
emitKey,
-                                        @QueryParam(HANDLER_PARAM) String 
handlerTypeName)
-            throws Exception {
-        Metadata metadata = new Metadata();
-        Fetcher fetcher = fetcherManager.getFetcher(fetcherName);
-        List<Metadata> metadataList;
-        try (InputStream fetchedIs = fetcher.fetch(fetchKey, metadata)) {
-            HandlerConfig handlerConfig = RecursiveMetadataResource
-                    .buildHandlerConfig(httpHeaders.getRequestHeaders(), 
handlerTypeName);
-            metadataList = RecursiveMetadataResource
-                    .parseMetadata(fetchedIs, metadata, 
httpHeaders.getRequestHeaders(), info,
-                            handlerConfig);
-        }
-        emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
-        return emit(new EmitKey(emitterName, emitKey), metadataList);
-    }
-
-
-    /**
-     * The user puts the raw bytes of the file and specifies the emitter
-     * as elsewhere.  This will not trigger a fetcher.  If you want a
-     * fetcher, use the get or post options.
-     * <p>
-     * The extracted text content is stored with the key
-     * {@link TikaCoreProperties#TIKA_CONTENT}
-     * <p>
-     * Must specify an emitter in the path, e.g. /emit/solr
-     * <p>
-     * Optionally, may specify handler, e.g. /emit/solr/xml
-     *
-     * @param info      uri info
-     * @param fullParam which emitter to use; emitters must be configured in
-     *                  the TikaConfig file.
-     * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
-     * @throws Exception
-     */
-    @PUT
-    @Produces("application/json")
-    @Path("{" + EMITTER_PARAM + " : (\\w+(/(text|body|xml|ignore))?)}")
-    public Map<String, String> putRmeta(InputStream is, @Context HttpHeaders 
httpHeaders,
-                                        @Context UriInfo info,
-                                        @PathParam(EMITTER_PARAM) String 
fullParam)
-            throws Exception {
-
-        Matcher m = Pattern.compile("(\\w+)(?:/(\\w+))?").matcher(fullParam);
-        String emitterName = fullParam;
-        String handlerTypeName = "text";
-        if (m.find()) {
-            emitterName = m.group(1);
-            if (m.groupCount() > 1) {
-                handlerTypeName = m.group(2);
-            }
-        }
-        Metadata metadata = new Metadata();
-        String emitKey = httpHeaders.getHeaderString(EMIT_KEY_FOR_HTTP_HEADER);
-        HandlerConfig handlerConfig = RecursiveMetadataResource
-                .buildHandlerConfig(httpHeaders.getRequestHeaders(), 
handlerTypeName);
-        List<Metadata> metadataList = RecursiveMetadataResource
-                .parseMetadata(is, metadata, httpHeaders.getRequestHeaders(), 
info, handlerConfig);
-        return emit(new EmitKey(emitterName, emitKey), metadataList);
-    }
-
-    /**
-     * The client posts a json request.  At a minimum, this must be a
-     * json object that contains an emitter and a fetcherString key with
-     * the key to fetch the inputStream. Optionally, it may contain a metadata
-     * object that will be used to populate the metadata key for pass
-     * through of metadata from the client. It may also include a handler 
config.
-     * <p>
-     * The extracted text content is stored with the key
-     * {@link TikaCoreProperties#TIKA_CONTENT}
-     * <p>
-     * Must specify a fetcherString and an emitter in the posted json.
-     *
-     * @param info uri info
-     * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
-     * @throws Exception
-     */
-    @POST
-    @Produces("application/json")
-    public Map<String, String> postRmeta(InputStream is, @Context HttpHeaders 
httpHeaders,
-                                         @Context UriInfo info) throws 
Exception {
-        FetchEmitTuple t = null;
-        try (Reader reader = new InputStreamReader(is, 
StandardCharsets.UTF_8)) {
-            t = JsonFetchEmitTuple.fromJson(reader);
-        }
-        Metadata metadata = new Metadata();
-
-        List<Metadata> metadataList = null;
-        try (InputStream stream = fetcherManager
-                .getFetcher(t.getFetchKey().getFetcherName())
-                .fetch(t.getFetchKey().getFetchKey(), metadata)) {
-
-            metadataList = RecursiveMetadataResource
-                    .parseMetadata(stream, metadata, 
httpHeaders.getRequestHeaders(), info,
-                            t.getHandlerConfig());
-        } catch (Error error) {
-            return returnError(t.getEmitKey().getEmitterName(), error);
-        }
-
-        boolean shouldEmit = checkParseException(t, metadataList);
-        if (!shouldEmit) {
-            return skip(t, metadataList);
-        }
-
-        injectUserMetadata(t.getMetadata(), metadataList.get(0));
-
-        for (String n : metadataList.get(0).names()) {
-            LOG.debug("post parse/pre emit metadata {}: {}", n, 
metadataList.get(0).get(n));
-        }
-        return emit(calcEmitKey(t), metadataList);
-    }
-
-    private Map<String, String> skip(FetchEmitTuple t, List<Metadata> 
metadataList) {
-        Map<String, String> statusMap = new HashMap<>();
-        statusMap.put("status", "ok");
-        statusMap.put("emitter", t.getEmitKey().getEmitterName());
-        statusMap.put("emitKey", t.getEmitKey().getEmitKey());
-        String msg = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
-        statusMap.put("parse_exception", msg);
-        return statusMap;
-    }
-
-    private boolean checkParseException(FetchEmitTuple t, List<Metadata> 
metadataList) {
-        if (metadataList == null || metadataList.size() < 1) {
-            return false;
-        }
-        boolean shouldEmit = true;
-        String stack = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
-        if (stack != null) {
-            if (t.getOnParseException() == 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
-                shouldEmit = false;
-            }
-            LOG.warn("fetchKey ({}) caught container parse exception ({})",
-                    t.getFetchKey().getFetchKey(), stack);
-        }
-
-        for (int i = 1; i < metadataList.size(); i++) {
-            Metadata m = metadataList.get(i);
-            String embeddedStack = 
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
-            if (embeddedStack != null) {
-                LOG.warn("fetchKey ({}) caught embedded parse exception ({})",
-                        t.getFetchKey().getFetchKey(), embeddedStack);
-            }
-        }
-
-        return shouldEmit;
-    }
-
-
-    private void injectUserMetadata(Metadata userMetadata, Metadata metadata) {
-        for (String n : userMetadata.names()) {
-            metadata.set(n, null);
-            for (String v : userMetadata.getValues(n)) {
-                metadata.add(n, v);
-            }
-        }
-    }
-
-    private Map<String, String> returnError(String emitterName, Error error) {
-        Map<String, String> statusMap = new HashMap<>();
-        statusMap.put("status", "parse_error");
-        statusMap.put("emitter", emitterName);
-        String msg = ExceptionUtils.getStackTrace(error);
-        statusMap.put("parse_error", msg);
-        return statusMap;
-    }
-
-    private Map<String, String> emit(EmitKey emitKey, List<Metadata> 
metadataList)
-            throws TikaException {
-        Emitter emitter = emitterManager.getEmitter(emitKey.getEmitterName());
-        String status = "ok";
-        String exceptionMsg = "";
-        try {
-            emitter.emit(emitKey.getEmitKey(), metadataList);
-        } catch (IOException | TikaEmitterException e) {
-            LOG.warn("problem emitting (" + emitKey.getEmitterName() + ")", e);
-            status = "emitter_exception";
-            exceptionMsg = ExceptionUtils.getStackTrace(e);
-        }
-        Map<String, String> statusMap = new HashMap<>();
-        statusMap.put("status", status);
-        statusMap.put("emitter", emitKey.getEmitterName());
-        if (exceptionMsg.length() > 0) {
-            statusMap.put("emitter_exception", exceptionMsg);
-        }
-        String parseStackTrace = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
-        if (parseStackTrace != null) {
-            statusMap.put("parse_exception", parseStackTrace);
-        }
-        return statusMap;
-    }
-
-}
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
new file mode 100644
index 0000000..daf4da3
--- /dev/null
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server.core.resource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesException;
+import org.apache.tika.pipes.PipesParser;
+import org.apache.tika.pipes.PipesResult;
+
+@Path("/pipes")
+public class PipesResource {
+
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipesResource.class);
+
+    private final PipesParser pipesParser;
+    public PipesResource(java.nio.file.Path tikaConfig) throws 
TikaConfigException, IOException {
+        PipesConfig pipesConfig = PipesConfig.load(tikaConfig);
+        this.pipesParser = new PipesParser(pipesConfig);
+    }
+
+
+    /**
+     * The client posts a json request.  At a minimum, this must be a
+     * json object that contains an emitter and a fetcherString key with
+     * the key to fetch the inputStream. Optionally, it may contain a metadata
+     * object that will be used to populate the metadata key for pass
+     * through of metadata from the client. It may also include a handler 
config.
+     * <p>
+     * The extracted text content is stored with the key
+     * {@link TikaCoreProperties#TIKA_CONTENT}
+     * <p>
+     * Must specify a fetcherString and an emitter in the posted json.
+     *
+     * @param info uri info
+     * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
+     * @throws Exception
+     */
+    @POST
+    @Produces("application/json")
+    public Map<String, String> postRmeta(InputStream is, @Context HttpHeaders 
httpHeaders,
+                                         @Context UriInfo info) throws 
Exception {
+        FetchEmitTuple t = null;
+        try (Reader reader = new InputStreamReader(is, 
StandardCharsets.UTF_8)) {
+            t = JsonFetchEmitTuple.fromJson(reader);
+        }
+        return processTuple(t);
+    }
+
+    private Map<String, String> processTuple(FetchEmitTuple fetchEmitTuple)
+            throws PipesException, IOException {
+
+        PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
+        switch (pipesResult.getStatus()) {
+            case CLIENT_UNAVAILABLE_WITHIN_MS:
+                throw new IllegalStateException("client not available within " 
+
+                        "allotted amount of time");
+            case EMIT_EXCEPTION:
+                return returnEmitException(pipesResult.getMessage());
+            case PARSE_SUCCESS:
+                throw new IllegalArgumentException("Should have emitted in 
forked process?!");
+            case EMIT_SUCCESS:
+                return returnSuccess();
+            case EMIT_SUCCESS_PARSE_EXCEPTION:
+                return parseException(pipesResult.getMessage(), true);
+            case PARSE_EXCEPTION_EMIT:
+                throw new IllegalArgumentException("Should have tried to emit 
in forked " +
+                        "process?!");
+            case PARSE_EXCEPTION_NO_EMIT:
+                return parseException(pipesResult.getMessage(), false);
+            case TIMEOUT:
+                return returnError("timeout");
+            case OOM:
+                return returnError("oom");
+            case UNSPECIFIED_CRASH:
+                return returnError("unknown_crash");
+            case NO_EMITTER_FOUND: {
+                throw new IllegalArgumentException("Couldn't find emitter that 
matched: " +
+                        fetchEmitTuple.getEmitKey().getEmitterName());
+            }
+            default:
+                throw new IllegalArgumentException("I'm sorry, I don't yet 
handle a status of " +
+                        "this type: " + pipesResult.getStatus());
+        }
+    }
+
+    private Map<String, String> parseException(String msg, boolean emitted) {
+        Map<String, String> statusMap = new HashMap<>();
+        statusMap.put("status", "ok");
+        statusMap.put("parse_exception", msg);
+        statusMap.put("emitted", Boolean.toString(emitted));
+        return statusMap;
+    }
+
+    private Map<String, String> returnEmitException(String msg) {
+        Map<String, String> statusMap = new HashMap<>();
+        statusMap.put("status", "emit_exception");
+        statusMap.put("message", msg);
+        return statusMap;
+    }
+
+    private Map<String, String> returnSuccess() {
+        Map<String, String> statusMap = new HashMap<>();
+        statusMap.put("status", "ok");
+        return statusMap;
+    }
+
+    private Map<String, String> returnError(String type) {
+        Map<String, String> statusMap = new HashMap<>();
+        statusMap.put("status", "parse_error");
+        statusMap.put("parse_error", type);
+        return statusMap;
+    }
+
+    public void close() throws IOException {
+        pipesParser.close();
+    }
+}
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
similarity index 66%
rename from 
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
rename to 
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index 69b57c5..58e37f5 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
@@ -46,6 +47,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
@@ -53,46 +55,50 @@ import 
org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.server.core.resource.EmitterResource;
+import org.apache.tika.server.core.resource.PipesResource;
 import org.apache.tika.server.core.writer.JSONObjWriter;
+import org.apache.tika.utils.ProcessUtils;
 
 /**
  * This offers basic integration tests with fetchers and emitters.
  * We use file system fetchers and emitters.
  */
-public class TikaEmitterTest extends CXFTestBase {
+public class TikaPipesTest extends CXFTestBase {
 
-    private static final String EMITTER_PATH = "/emit";
-    private static final String EMITTER_PATH_AND_FS = "/emit/fse";
+    private static final String PIPES_PATH = "/pipes";
     private static Path TMP_DIR;
     private static Path TMP_OUTPUT_DIR;
     private static Path TMP_OUTPUT_FILE;
+    private static Path TMP_NPE_OUTPUT_FILE;
+    private static Path TIKA_CONFIG_PATH;
     private static String TIKA_CONFIG_XML;
     private static FetcherManager FETCHER_MANAGER;
-    private static EmitterManager EMITTER_MANAGER;
     private static String HELLO_WORLD = "hello_world.xml";
     private static String HELLO_WORLD_JSON = "hello_world.xml.json";
+    private static String NPE_JSON = "null_pointer.xml.json";
 
     private static String[] VALUE_ARRAY = new String[]{"my-value-1", 
"my-value-2", "my-value-3"};
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        TMP_DIR = Files.createTempDirectory("tika-emitter-test-");
+        TMP_DIR = Files.createTempDirectory("tika-pipes-test-");
         Path inputDir = TMP_DIR.resolve("input");
         TMP_OUTPUT_DIR = TMP_DIR.resolve("output");
         TMP_OUTPUT_FILE = TMP_OUTPUT_DIR.resolve(HELLO_WORLD_JSON);
+        TMP_NPE_OUTPUT_FILE = TMP_OUTPUT_DIR.resolve("null_pointer.xml.json");
+
         Files.createDirectories(inputDir);
         Files.createDirectories(TMP_OUTPUT_DIR);
 
         for (String mockFile : new String[]{"hello_world.xml", 
"null_pointer.xml"}) {
             Files.copy(
-                    
TikaEmitterTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
+                    
TikaPipesTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
                     inputDir.resolve(mockFile));
         }
+        TIKA_CONFIG_PATH = Files.createTempFile(TMP_DIR, "tika-pipes-", 
".xml");
 
         TIKA_CONFIG_XML =
                 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<properties>" 
+ "<fetchers>" +
@@ -105,11 +111,12 @@ public class TikaEmitterTest extends CXFTestBase {
                         "<basePath>" +
                         TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" + 
"</params>" +
                         "</emitter>" +
-                        "</emitters>" + "</properties>";
-        Path tmp = Files.createTempFile("tika-emitter-", ".xml");
-        Files.write(tmp, TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
-        FETCHER_MANAGER = FetcherManager.load(tmp);
-        EMITTER_MANAGER = EmitterManager.load(tmp);
+                        "</emitters>" + "<pipes><params><tikaConfig>" +
+                
ProcessUtils.escapeCommandLine(TIKA_CONFIG_PATH.toAbsolutePath().toString()) +
+                        
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+                        "</arg></forkedJvmArgs>" +
+                        "</params></pipes>" + "</properties>";
+        Files.write(TIKA_CONFIG_PATH, 
TIKA_CONFIG_XML.getBytes(StandardCharsets.UTF_8));
     }
 
     @AfterClass
@@ -122,13 +129,20 @@ public class TikaEmitterTest extends CXFTestBase {
         if (Files.exists(TMP_OUTPUT_FILE)) {
             Files.delete(TMP_OUTPUT_FILE);
         }
+        if (Files.exists(TMP_NPE_OUTPUT_FILE)) {
+            Files.delete(TMP_NPE_OUTPUT_FILE);
+        }
         assertFalse(Files.isRegularFile(TMP_OUTPUT_FILE));
     }
 
     @Override
     protected void setUpResources(JAXRSServerFactoryBean sf) {
         List<ResourceProvider> rCoreProviders = new 
ArrayList<ResourceProvider>();
-        rCoreProviders.add(new SingletonResourceProvider(new 
EmitterResource(FETCHER_MANAGER, EMITTER_MANAGER)));
+        try {
+            rCoreProviders.add(new SingletonResourceProvider(new 
PipesResource(TIKA_CONFIG_PATH)));
+        } catch (IOException | TikaConfigException e) {
+            throw new RuntimeException(e);
+        }
         sf.setResourceProviders(rCoreProviders);
     }
 
@@ -150,41 +164,6 @@ public class TikaEmitterTest extends CXFTestBase {
         return new FetcherStreamFactory(FETCHER_MANAGER);
     }
 
-    @Test
-    public void testGet() throws Exception {
-
-        String q = "?fn=fsf&fk=hello_world.xml&type=text";
-        String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
-        Response response = 
WebClient.create(getUrl).accept("application/json").get();
-        assertEquals(200, response.getStatus());
-        List<Metadata> metadataList = null;
-        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
-            metadataList = JsonMetadataList.fromJson(reader);
-        }
-        assertEquals(1, metadataList.size());
-        Metadata metadata = metadataList.get(0);
-        assertEquals("hello world", 
metadata.get(TikaCoreProperties.TIKA_CONTENT).trim());
-        assertEquals("Nikolai Lobachevsky", metadata.get("author"));
-        assertEquals("你好,世界", metadata.get("title"));
-        assertEquals("application/mock+xml", 
metadata.get(Metadata.CONTENT_TYPE));
-    }
-
-    @Test
-    public void testGetXML() throws Exception {
-
-        String q = "?fn=fsf&fk=hello_world.xml&type=xml";
-        String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
-        Response response = 
WebClient.create(getUrl).accept("application/json").get();
-        assertEquals(200, response.getStatus());
-        List<Metadata> metadataList = null;
-        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
-            metadataList = JsonMetadataList.fromJson(reader);
-        }
-        assertEquals(1, metadataList.size());
-        Metadata metadata = metadataList.get(0);
-        String xml = metadata.get(TikaCoreProperties.TIKA_CONTENT);
-        assertContains("<p>hello world</p>", xml);
-    }
 
     @Test
     public void testPost() throws Exception {
@@ -196,12 +175,13 @@ public class TikaEmitterTest extends CXFTestBase {
         }
 
         FetchEmitTuple t =
-                new FetchEmitTuple(new FetchKey("fsf", "hello_world.xml"), new 
EmitKey("fse", ""),
+                new FetchEmitTuple("myId",
+                        new FetchKey("fsf", "hello_world.xml"), new 
EmitKey("fse", ""),
                         userMetadata);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
 
-        String getUrl = endPoint + EMITTER_PATH;
+        String getUrl = endPoint + PIPES_PATH;
         Response response =
                 
WebClient.create(getUrl).accept("application/json").post(writer.toString());
         assertEquals(200, response.getStatus());
@@ -230,14 +210,16 @@ public class TikaEmitterTest extends CXFTestBase {
         }
 
         FetchEmitTuple t =
-                new FetchEmitTuple(new FetchKey("fsf", "hello_world.xml"), new 
EmitKey("fse", ""),
+                new FetchEmitTuple("myId",
+                        new FetchKey("fsf", "hello_world.xml"),
+                        new EmitKey("fse", ""),
                         userMetadata,
                         new 
HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, -1, -1),
                         FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
 
-        String getUrl = endPoint + EMITTER_PATH;
+        String getUrl = endPoint + PIPES_PATH;
         Response response =
                 
WebClient.create(getUrl).accept("application/json").post(writer.toString());
         assertEquals(200, response.getStatus());
@@ -252,47 +234,6 @@ public class TikaEmitterTest extends CXFTestBase {
     }
 
     @Test
-    public void testPut() throws Exception {
-
-        String getUrl = endPoint + EMITTER_PATH_AND_FS + "/text";
-        String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
-
-        Response response = WebClient.create(getUrl).accept("application/json")
-                .header(metaPathKey, "hello_world.xml")
-                
.put(ClassLoader.getSystemResourceAsStream("test-documents/mock/hello_world.xml"));
-        assertEquals(200, response.getStatus());
-        List<Metadata> metadataList = null;
-        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
-            metadataList = JsonMetadataList.fromJson(reader);
-        }
-        assertEquals(1, metadataList.size());
-        Metadata metadata = metadataList.get(0);
-        assertEquals("hello world", 
metadata.get(TikaCoreProperties.TIKA_CONTENT).trim());
-        assertEquals("Nikolai Lobachevsky", metadata.get("author"));
-        assertEquals("你好,世界", metadata.get("title"));
-        assertEquals("application/mock+xml", 
metadata.get(Metadata.CONTENT_TYPE));
-    }
-
-    @Test
-    public void testPutXML() throws Exception {
-
-        String putUrl = endPoint + EMITTER_PATH_AND_FS + "/xml";
-        String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
-
-        Response response = WebClient.create(putUrl).accept("application/json")
-                .header(metaPathKey, "hello_world.xml")
-                
.put(ClassLoader.getSystemResourceAsStream("test-documents/mock/hello_world.xml"));
-        assertEquals(200, response.getStatus());
-        List<Metadata> metadataList = null;
-        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
-            metadataList = JsonMetadataList.fromJson(reader);
-        }
-        assertEquals(1, metadataList.size());
-        Metadata metadata = metadataList.get(0);
-        assertContains("<p>hello world</p>", 
metadata.get(TikaCoreProperties.TIKA_CONTENT));
-    }
-
-    @Test
     public void testPostNPE() throws Exception {
         Metadata userMetadata = new Metadata();
         userMetadata.set("my-key", "my-value");
@@ -301,12 +242,14 @@ public class TikaEmitterTest extends CXFTestBase {
         }
 
         FetchEmitTuple t =
-                new FetchEmitTuple(new FetchKey("fsf", "null_pointer.xml"), 
new EmitKey("fse", ""),
+                new FetchEmitTuple("myId",
+                        new FetchKey("fsf", "null_pointer.xml"),
+                        new EmitKey("fse", ""),
                         userMetadata);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
 
-        String getUrl = endPoint + EMITTER_PATH;
+        String getUrl = endPoint + PIPES_PATH;
         Response response =
                 
WebClient.create(getUrl).accept("application/json").post(writer.toString());
         assertEquals(200, response.getStatus());
@@ -320,7 +263,7 @@ public class TikaEmitterTest extends CXFTestBase {
         String parseException = jsonResponse.get("parse_exception").asText();
         assertNotNull(parseException);
         assertContains("NullPointerException", parseException);
-
+        assertEquals(true, jsonResponse.get("emitted").asBoolean());
         List<Metadata> metadataList = null;
         try (Reader reader = Files
                 
.newBufferedReader(TMP_OUTPUT_DIR.resolve("null_pointer.xml.json"))) {
@@ -335,5 +278,31 @@ public class TikaEmitterTest extends CXFTestBase {
                 metadata.get(TikaCoreProperties.CONTAINER_EXCEPTION));
     }
 
-    //can't test system_exit here because server is in same process
+    @Test
+    public void testPostNPENoEmit() throws Exception {
+        FetchEmitTuple t =
+                new FetchEmitTuple("myId",
+                        new FetchKey("fsf", "null_pointer.xml"),
+                        new EmitKey("fse", ""),
+                        FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTuple.toJson(t, writer);
+
+        String getUrl = endPoint + PIPES_PATH;
+        Response response =
+                
WebClient.create(getUrl).accept("application/json").post(writer.toString());
+        assertEquals(200, response.getStatus());
+
+        JsonNode jsonResponse;
+        try (Reader reader = new InputStreamReader((InputStream) 
response.getEntity(),
+                StandardCharsets.UTF_8)) {
+            jsonResponse = new ObjectMapper().readTree(reader);
+        }
+        ;
+        String parseException = jsonResponse.get("parse_exception").asText();
+        assertNotNull(parseException);
+        assertContains("NullPointerException", parseException);
+        assertEquals(false, jsonResponse.get("emitted").asBoolean());
+        assertFalse(Files.isRegularFile(TMP_NPE_OUTPUT_FILE));
+    }
 }
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index d5ee1a5..b86e1ee 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import javax.ws.rs.core.Response;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -49,12 +50,13 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.utils.ProcessUtils;
 
 @Ignore("useful for development...need to turn it into a real unit test")
 public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
-    private static final int NUM_FILES = 450;
+    private static final int NUM_FILES = 100;
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
     private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
@@ -66,8 +68,9 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
     private static List<String> FILE_LIST = new ArrayList<>();
     private static String[] FILES = new String[]{
             "hello_world.xml",
-            "null_pointer.xml"
-            // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+            "null_pointer.xml",
+            // "heavy_hang_30000.xml", "real_oom.xml",
+            "system_exit.xml"
     };
 
     @BeforeClass
@@ -77,13 +80,18 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
         TMP_OUTPUT_DIR = TMP_DIR.resolve("output");
         Files.createDirectories(inputDir);
         Files.createDirectories(TMP_OUTPUT_DIR);
-
+        Random rand = new Random();
         for (int i = 0; i < NUM_FILES; i++) {
             for (String mockFile : FILES) {
+                if (mockFile.equals("system_exit.xml")) {
+                    if (rand.nextFloat() > 0.1) {
+                        continue;
+                    }
+                }
                 String targetName = i + "-" + mockFile;
                 Path target = inputDir.resolve(targetName);
                 FILE_LIST.add(targetName);
-                Files.copy(TikaEmitterTest.class
+                Files.copy(TikaPipesTest.class
                         .getResourceAsStream("/test-documents/mock/" + 
mockFile), target);
 
             }
@@ -105,8 +113,13 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
                         TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" + 
"</params>" +
                         "</emitter>" +
                         "</emitters>" +
-                        "<server><params>" +
+                        
"<server><params><endpoints><endpoint>async</endpoint></endpoints>" +
                         
"<enableUnsecureFeatures>true</enableUnsecureFeatures></params></server>" +
+                        "<async><params><tikaConfig>" +
+                        
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString()) +
+                        
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+                        
"</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
+                        "</params></async>" +
                         "</properties>";
 
         FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
@@ -154,13 +167,13 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
                 fail("bad status: '" + status + "' -> " + 
response.toPrettyString());
             }
             int expected = (ON_PARSE_EXCEPTION == 
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
-                    FILE_LIST.size() : FILE_LIST.size() / 2;
+                    FILE_LIST.size() : FILE_LIST.size() / 3;
             int targets = 0;
-            while (targets < FILE_LIST.size()) {
+            while (targets < NUM_FILES * 2) {
                 targets = countTargets();
                 Thread.sleep(100);
             }
-            System.out.println("elapsed : " + (System.currentTimeMillis() - 
start));
+//            System.out.println("elapsed : " + (System.currentTimeMillis() - 
start));
         } finally {
             serverThread.interrupt();
         }
@@ -185,7 +198,7 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
     }
 
     private FetchEmitTuple getFetchEmitTuple(String fileName) throws 
IOException {
-        return new FetchEmitTuple(new FetchKey(FETCHER_NAME, fileName),
+        return new FetchEmitTuple(fileName, new FetchKey(FETCHER_NAME, 
fileName),
                 new EmitKey(EMITTER_NAME, ""), new Metadata(), 
HandlerConfig.DEFAULT_HANDLER_CONFIG,
                 ON_PARSE_EXCEPTION);
     }
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
similarity index 86%
rename from 
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
rename to 
tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
index 2677bf1..5ff0f11 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerPipesIntegrationTest.java
@@ -52,10 +52,10 @@ import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.utils.ProcessUtils;
 
-public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
+public class TikaServerPipesIntegrationTest extends IntegrationTestBase {
 
     private static final Logger LOG =
-            LoggerFactory.getLogger(TikaServerEmitterIntegrationTest.class);
+            LoggerFactory.getLogger(TikaServerPipesIntegrationTest.class);
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
     private static Path TMP_DIR;
@@ -76,7 +76,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
 
         for (String mockFile : FILES) {
             Files.copy(
-                    
TikaEmitterTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
+                    
TikaPipesTest.class.getResourceAsStream("/test-documents/mock/" + mockFile),
                     inputDir.resolve(mockFile));
         }
         TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
@@ -92,9 +92,14 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
                 "<basePath>" + TMP_OUTPUT_DIR.toAbsolutePath() +
                 "</basePath>" + "</params>" + "</emitter>" + "</emitters>" + 
"<server><params>" +
                 "<enableUnsecureFeatures>true</enableUnsecureFeatures>" + 
"<port>9999</port>" +
-                "<endpoints>" + "<endpoint>emit</endpoint>" + 
"<endpoint>status</endpoint>" +
+                "<endpoints>" + "<endpoint>pipes</endpoint>" + 
"<endpoint>status</endpoint>" +
                 "</endpoints>";
-        String xml2 = "</params></server>" + "</properties>";
+        String xml2 = "</params></server>" +
+                "<pipes><params><tikaConfig>" +
+                
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString()) +
+                
"</tikaConfig><numClients>10</numClients><forkedJvmArgs><arg>-Xmx256m" +
+                "</arg></forkedJvmArgs><timeoutMillis>5000</timeoutMillis>" +
+                "</params></pipes>" + "</properties>";
 
         String tikaConfigXML = xml1 + xml2;
 
@@ -138,6 +143,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
             JsonNode node = testOne("hello_world.xml", true);
             assertEquals("ok", node.get("status").asText());
         } catch (Exception e) {
+            e.printStackTrace();
             fail("shouldn't have an exception" + e.getMessage());
         } finally {
             if (p != null) {
@@ -185,13 +191,15 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         }
     }
 
-    @Test(expected = ProcessingException.class)
+    @Test
     public void testSystemExit() throws Exception {
         Process p = null;
         try {
             p = startProcess(new String[]{"-config",
                     
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString())});
-            testOne("system_exit.xml", false);
+            JsonNode node = testOne("system_exit.xml", false);
+            assertEquals("parse_error", node.get("status").asText());
+            assertContains("unknown_crash", node.get("parse_error").asText());
         } finally {
             if (p != null) {
                 p.destroyForcibly();
@@ -206,8 +214,9 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         try {
             p = startProcess(new String[]{"-config",
                     
ProcessUtils.escapeCommandLine(TIKA_CONFIG.toAbsolutePath().toString())});
-            JsonNode response = testOne("fake_oom.xml", false);
-            assertContains("oom message", 
response.get("parse_error").asText());
+            JsonNode node = testOne("fake_oom.xml", false);
+            assertEquals("parse_error", node.get("status").asText());
+            assertContains("oom", node.get("parse_error").asText());
         } catch (ProcessingException e) {
             //depending on timing, there may be a connection exception --
             // TODO add more of a delay to server shutdown to ensure message 
is sent
@@ -219,13 +228,15 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         }
     }
 
-    @Test(expected = ProcessingException.class)
+    @Test
     public void testTimeout() throws Exception {
         Process p = null;
         try {
             p = startProcess(new String[]{"-config", 
ProcessUtils.escapeCommandLine(
                     TIKA_CONFIG_TIMEOUT.toAbsolutePath().toString())});
-            JsonNode response = testOne("heavy_hang_30000.xml", false);
+            JsonNode node = testOne("heavy_hang_30000.xml", false);
+            assertEquals("parse_error", node.get("status").asText());
+            assertContains("timeout", node.get("parse_error").asText());
         } finally {
             if (p != null) {
                 p.destroyForcibly();
@@ -243,7 +254,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
 
         awaitServerStartup();
         Response response = WebClient
-                .create(endPoint + "/emit")
+                .create(endPoint + "/pipes")
                 .accept("application/json")
                 .post(getJsonString(fileName, onParseException));
         if (response.getStatus() == 200) {
@@ -262,7 +273,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
     private String getJsonString(String fileName,
                                  FetchEmitTuple.ON_PARSE_EXCEPTION 
onParseException)
             throws IOException {
-        FetchEmitTuple t = new FetchEmitTuple(new FetchKey(FETCHER_NAME, 
fileName),
+        FetchEmitTuple t = new FetchEmitTuple(fileName, new 
FetchKey(FETCHER_NAME, fileName),
                 new EmitKey(EMITTER_NAME, ""), new Metadata(), 
HandlerConfig.DEFAULT_HANDLER_CONFIG,
                 onParseException);
         return JsonFetchEmitTuple.toJson(t);

Reply via email to