This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 3712dce  TIKA-3226 - refactor to allow users to specify emit key, add 
stream emitter
3712dce is described below

commit 3712dce23339737c0c9dc2b8591f43c8123f231f
Author: tballison <[email protected]>
AuthorDate: Thu Jan 28 11:47:42 2021 -0500

    TIKA-3226 - refactor to allow users to specify emit key, add stream emitter
---
 .../java/org/apache/tika/config/TikaConfig.java    |   7 +
 .../emitter/{EmptyEmitter.java => EmitKey.java}    |  29 ++--
 .../org/apache/tika/pipes/emitter/Emitter.java     |   3 +-
 .../apache/tika/pipes/emitter/EmptyEmitter.java    |   2 +-
 .../apache/tika/pipes/emitter/StreamEmitter.java}  |  12 +-
 .../pipes/fetcher/{FetchId.java => FetchKey.java}  |  12 +-
 .../FetchEmitTuple.java}                           |  49 +++----
 .../tika/pipes/fetchiterator/FetchIterator.java    |  21 ++-
 .../fetchiterator/FileSystemFetchIterator.java     |  32 ++---
 .../java/org/apache/tika/utils/StringUtils.java    |   4 +
 .../org/apache/tika/pipes/emitter/MockEmitter.java |   2 +-
 .../fetchiterator/FileSystemFetchIteratorTest.java |   7 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   |  30 ++--
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    |  66 ++++++---
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  33 ++---
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |   3 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  | 153 ++++++++++++++++++---
 .../src/test/java/TestCSVFetchIterator.java        |  30 ++--
 .../fetchiterator/jdbc/JDBCFetchIterator.java      | 111 +++++++++++++--
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  31 ++---
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  12 +-
 .../fetchiterator/s3/TestS3FetchIterator.java      |  22 ++-
 .../apache/tika/pipes/PipeIntegrationTests.java    |  51 +++----
 .../org/apache/tika/server/client/TikaClient.java  |  15 +-
 .../apache/tika/server/client/TikaClientCLI.java   |  37 ++---
 .../tika/server/client/TikaClientConfig.java       |  85 ++++++++++++
 .../tika/server/core/resource/EmitterResource.java |  68 ++++-----
 .../apache/tika/server/core/TikaEmitterTest.java   |   6 +-
 .../core/TikaServerEmitterIntegrationTest.java     |   2 +-
 29 files changed, 620 insertions(+), 315 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java 
b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
index bd4c389..e29a337 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
@@ -1647,4 +1647,11 @@ public class TikaConfig {
                     paramName + "' must be set in the config file");
         }
     }
+
+    public static void mustNotBeEmpty(String paramName, Path paramValue) 
throws TikaConfigException {
+        if (paramValue == null) {
+            throw new IllegalArgumentException("parameter '"+
+                    paramName + "' must be set in the config file");
+        }
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
similarity index 61%
copy from 
tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
copy to tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index 9a1a10a..47a8ee7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -16,20 +16,29 @@
  */
 package org.apache.tika.pipes.emitter;
 
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
+public class EmitKey {
 
-import java.io.IOException;
-import java.util.List;
+    private final String emitterName;
+    private final String emitKey;
 
-public class EmptyEmitter implements Emitter {
-    @Override
-    public String getName() {
-        return "empty";
+    public EmitKey(String emitterName, String emitKey) {
+        this.emitterName = emitterName;
+        this.emitKey = emitKey;
     }
 
-    @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+    public String getEmitterName() {
+        return emitterName;
+    }
 
+    public String getEmitKey() {
+        return emitKey;
+    }
+
+    @Override
+    public String toString() {
+        return "EmitterKey{" +
+                "emitterName='" + emitterName + '\'' +
+                ", emitterKey='" + emitKey + '\'' +
+                '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 445000d..4dc2291 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.tika.pipes.emitter;
 
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
@@ -26,7 +25,7 @@ public interface Emitter {
 
     String getName();
 
-    void emit(List<Metadata> metadataList) throws IOException, TikaException;
+    void emit(String emitKey, List<Metadata> metadataList) throws IOException, 
TikaEmitterException;
     //TODO we can add this later?
     //void emit(String txt, Metadata metadata) throws IOException, 
TikaException;
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 9a1a10a..8c0ebda 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -29,7 +29,7 @@ public class EmptyEmitter implements Emitter {
     }
 
     @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
 
     }
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
similarity index 79%
copy from tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
copy to tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
index f00d815..d7f5921 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
@@ -16,16 +16,12 @@
  */
 package org.apache.tika.pipes.emitter;
 
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
-import java.util.List;
+import java.io.InputStream;
 
-public class MockEmitter extends AbstractEmitter {
-
-    @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
-
-    }
+public interface StreamEmitter extends Emitter {
+    void emit(String emitKey, InputStream inputStream, Metadata userMetadata)
+            throws IOException, TikaEmitterException;
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
similarity index 82%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java
rename to tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index a327fc6..6e86bcf 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -20,11 +20,11 @@ package org.apache.tika.pipes.fetcher;
  * Pair of fetcherName (which fetcher to call) and the key
  * to send to that fetcher to retrieve a specific file.
  */
-public class FetchId {
+public class FetchKey {
     private final String fetcherName;
     private final String fetchKey;
 
-    public FetchId(String fetcherName, String fetchKey) {
+    public FetchKey(String fetcherName, String fetchKey) {
         this.fetcherName = fetcherName;
         this.fetchKey = fetchKey;
     }
@@ -33,7 +33,7 @@ public class FetchId {
         return fetcherName;
     }
 
-    public String getFetchKey() {
+    public String getKey() {
         return fetchKey;
     }
 
@@ -50,10 +50,10 @@ public class FetchId {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
 
-        FetchId fetchId = (FetchId) o;
+        FetchKey fetchKey = (FetchKey) o;
 
-        if (fetcherName != null ? !fetcherName.equals(fetchId.fetcherName) : 
fetchId.fetcherName != null) return false;
-        return fetchKey != null ? fetchKey.equals(fetchId.fetchKey) : 
fetchId.fetchKey == null;
+        if (fetcherName != null ? !fetcherName.equals(fetchKey.fetcherName) : 
fetchKey.fetcherName != null) return false;
+        return this.fetchKey != null ? this.fetchKey.equals(fetchKey.fetchKey) 
: fetchKey.fetchKey == null;
     }
 
     @Override
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
similarity index 53%
rename from 
tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
rename to 
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
index 25d89db..9023378 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
@@ -14,51 +14,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetchiterator;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
 
-public class FetchIdMetadataPair {
+public class FetchEmitTuple {
 
-    private final FetchId fetchId;
+    private final FetchKey fetchKey;
+    private final EmitKey emitKey;
     private final Metadata metadata;
 
-    public FetchIdMetadataPair(FetchId fetchId, Metadata metadata) {
-        this.fetchId = fetchId;
+    public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata 
metadata) {
+        this.fetchKey = fetchKey;
+        this.emitKey = emitKey;
         this.metadata = metadata;
     }
 
-    public Metadata getMetadata() {
-        return metadata;
+    public FetchKey getFetchKey() {
+        return fetchKey;
     }
 
-    public FetchId getFetchId() {
-        return fetchId;
+    public EmitKey getEmitKey() {
+        return emitKey;
+    }
+
+    public Metadata getMetadata() {
+        return metadata;
     }
 
     @Override
     public String toString() {
-        return "FetchIdMetadataPair{" +
-                "fetchId=" + fetchId +
+        return "FetchEmitTuple{" +
+                "fetchKey=" + fetchKey +
+                ", emitKey=" + emitKey +
                 ", metadata=" + metadata +
                 '}';
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        FetchIdMetadataPair that = (FetchIdMetadataPair) o;
-
-        if (fetchId != null ? !fetchId.equals(that.fetchId) : that.fetchId != 
null) return false;
-        return metadata != null ? metadata.equals(that.metadata) : 
that.metadata == null;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = fetchId != null ? fetchId.hashCode() : 0;
-        result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
-        return result;
-    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index d3528ee..6b9457e 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -21,7 +21,6 @@ import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
 
 import java.io.IOException;
 import java.util.Map;
@@ -40,14 +39,15 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
 
     public static final long DEFAULT_MAX_WAIT_MS = 300_000;
     public static final int DEFAULT_QUEUE_SIZE = 1000;
-    public static final FetchIdMetadataPair COMPLETED_SEMAPHORE =
-            new FetchIdMetadataPair(null, null);
+    public static final FetchEmitTuple COMPLETED_SEMAPHORE =
+            new FetchEmitTuple(null, null, null);
 
     private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
     private int numConsumers = -1;
-    private ArrayBlockingQueue<FetchIdMetadataPair> queue = null;
+    private ArrayBlockingQueue<FetchEmitTuple> queue = null;
     private int queueSize = DEFAULT_QUEUE_SIZE;
     private String fetcherName;
+    private String emitterName;
     private int added = 0;
     public FetchIterator() {
 
@@ -61,7 +61,7 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
      * This must be called before 'calling' this object.
      * @param numConsumers
      */
-    public ArrayBlockingQueue<FetchIdMetadataPair> init(int numConsumers) {
+    public ArrayBlockingQueue<FetchEmitTuple> init(int numConsumers) {
         this.queue = new ArrayBlockingQueue<>(queueSize);
         this.numConsumers = numConsumers;
         return queue;
@@ -76,6 +76,14 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
         return fetcherName;
     }
 
+    @Field
+    public void setEmitterName(String emitterName) {
+        this.emitterName = emitterName;
+    }
+
+    public String getEmitterName() {
+        return emitterName;
+    }
 
     @Field
     public void setMaxWaitMs(long maxWaitMs) {
@@ -86,6 +94,7 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
     public void setQueueSize(int queueSize) {
         this.queueSize = queueSize;
     }
+
     @Override
     public Integer call() throws Exception {
         if (queue == null || numConsumers < 0) {
@@ -105,7 +114,7 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
 
     protected abstract void enqueue() throws IOException, TimeoutException, 
InterruptedException;
 
-    protected void tryToAdd(FetchIdMetadataPair p) throws 
InterruptedException, TimeoutException {
+    protected void tryToAdd(FetchEmitTuple p) throws InterruptedException, 
TimeoutException {
         if (p != COMPLETED_SEMAPHORE) {
             added++;
         }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 35e2ea8..68a96a1 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -19,11 +19,11 @@ package org.apache.tika.pipes.fetchiterator;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
+import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
 
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
@@ -32,7 +32,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
 public class FileSystemFetchIterator
@@ -60,9 +59,9 @@ public class FileSystemFetchIterator
             throw new IllegalArgumentException("\"basePath\" directory does 
not exist: " +
                     basePath.toAbsolutePath());
         }
-        String fetcherName = getFetcherName();
+
         try {
-            Files.walkFileTree(basePath, new FSFileVisitor(fetcherName));
+            Files.walkFileTree(basePath, new FSFileVisitor(getFetcherName(), 
getEmitterName()));
         } catch (IOException e) {
             Throwable cause = e.getCause();
             if (cause != null && cause instanceof TimeoutException) {
@@ -75,22 +74,21 @@ public class FileSystemFetchIterator
 
     @Override
     public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
+        //these should all be fatal
+        TikaConfig.mustNotBeEmpty("basePath", basePath);
+        TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName());
+        TikaConfig.mustNotBeEmpty("emitterName", getFetcherName());
 
-        //ignore problem handler.  These should all be fatal.
-        if (basePath == null) {
-            throw new TikaConfigException("Must specify a \"basePath\"");
-        }
-        if (getFetcherName() == null || getFetcherName().trim().length() == 0) 
{
-            throw new TikaConfigException("\"fetcherName\" must be specified 
and must be not blank");
-        }
     }
 
 
     private class FSFileVisitor implements FileVisitor<Path> {
 
         private final String fetcherName;
-        private FSFileVisitor(String fetcherName) {
+        private final String emitterName;
+        private FSFileVisitor(String fetcherName, String emitterName) {
             this.fetcherName = fetcherName;
+            this.emitterName = emitterName;
         }
         @Override
         public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes 
attrs) throws IOException {
@@ -102,8 +100,10 @@ public class FileSystemFetchIterator
             String relPath = basePath.relativize(file).toString();
 
             try {
-                tryToAdd(new FetchIdMetadataPair(
-                        new FetchId(fetcherName, relPath), new Metadata()));
+                tryToAdd(new FetchEmitTuple(
+                        new FetchKey(fetcherName, relPath),
+                        new EmitKey(emitterName, relPath), new Metadata())
+                );
             } catch (TimeoutException e) {
                 throw new IOException(e);
             } catch (InterruptedException e) {
diff --git a/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java 
b/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
index 0b39b36..d59827c 100644
--- a/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
+++ b/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
@@ -37,6 +37,10 @@ public class StringUtils {
         return cs == null || cs.length() == 0;
     }
 
+    public static boolean isBlank(final String s) {
+        return s == null || s.trim().length() == 0;
+    }
+
     /**
      * <p>Left pad a String with a specified String.</p>
      *
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java 
b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
index f00d815..b25dfca 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
@@ -25,7 +25,7 @@ import java.util.List;
 public class MockEmitter extends AbstractEmitter {
 
     @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
 
     }
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
 
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 1560166..c5b5558 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.tika.pipes.fetchiterator;
 
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -55,7 +54,7 @@ public class FileSystemFetchIteratorTest {
         ExecutorCompletionService<Integer> cs = new 
ExecutorCompletionService<>(es);
         FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
         it.setQueueSize(20000);
-        ArrayBlockingQueue<FetchIdMetadataPair> q = it.init(1);
+        ArrayBlockingQueue<FetchEmitTuple> q = it.init(1);
 
         cs.submit(it);
 
@@ -64,11 +63,11 @@ public class FileSystemFetchIteratorTest {
         f.get();
 
         Set<String> iteratorSet = new HashSet<>();
-        for (FetchIdMetadataPair p : q) {
+        for (FetchEmitTuple p : q) {
             if (p == FetchIterator.COMPLETED_SEMAPHORE) {
                 break;
             }
-            iteratorSet.add(p.getFetchId().getFetchKey());
+            iteratorSet.add(p.getFetchKey().getKey());
         }
 
         assertEquals(truthSet, iteratorSet);
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index 7ebe5bb..9398096 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.emitter.fs;
 import org.apache.tika.config.Field;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
@@ -26,6 +27,7 @@ import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -57,39 +59,37 @@ import java.util.Set;
  *      &lt;/emitters&gt;
  *  &lt;/properties&gt;</pre>
  */
-public class FileSystemEmitter extends AbstractEmitter {
+public class FileSystemEmitter extends AbstractEmitter implements 
StreamEmitter {
 
     private Path basePath = null;
     private String fileExtension = "json";
 
 
     @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
         Path output;
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or 
of size 0");
         }
 
-        String relPath = metadataList.get(0)
-                .get(TikaCoreProperties.SOURCE_PATH);
-        if (relPath == null) {
-            throw new TikaEmitterException("Must specify a 
"+TikaCoreProperties.SOURCE_PATH.getName() +
-                    " in the metadata in order for this emitter to generate 
the output file path.");
-        }
         if (fileExtension != null && fileExtension.length() > 0) {
-            relPath += "." + fileExtension;
+            emitKey += "." + fileExtension;
         }
         if (basePath != null) {
-            output = basePath.resolve(relPath);
+            output = basePath.resolve(emitKey);
         } else {
-            output = Paths.get(relPath);
+            output = Paths.get(emitKey);
         }
 
         if (!Files.isDirectory(output.getParent())) {
             Files.createDirectories(output.getParent());
         }
         try (Writer writer = Files.newBufferedWriter(output, 
StandardCharsets.UTF_8)) {
-            JsonMetadataList.toJson(metadataList, writer);
+            try {
+                JsonMetadataList.toJson(metadataList, writer);
+            } catch (TikaException e) {
+                throw new TikaEmitterException("can't create json", e);
+            }
         }
     }
 
@@ -107,4 +107,10 @@ public class FileSystemEmitter extends AbstractEmitter {
     public void setFileExtension(String fileExtension) {
         this.fileExtension = fileExtension;
     }
+
+    @Override
+    public void emit(String path, InputStream inputStream, Metadata 
userMetadata) throws IOException,
+            TikaEmitterException {
+        Files.copy(inputStream, basePath.resolve(path));
+    }
 }
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index bec5e1a..d72d8fb 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -28,11 +28,14 @@ import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +70,10 @@ import static 
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *                  &lt;param name="profile" 
type="string"&gt;my-profile&lt;/param&gt;
  *                  &lt;!-- required --&gt;
  *                  &lt;param name="bucket" 
type="string"&gt;my-bucket&lt;/param&gt;
- *                  &lt;!-- optional; default is 'json' --&gt;
+ *                  &lt;!-- optional; prefix to add to the path before 
emitting; default is no prefix --&gt;
+ *                  &lt;param name="prefix" 
type="string"&gt;my-prefix&lt;/param&gt;
+ *                  &lt;!-- optional; default is 'json' this will be added to 
the SOURCE_PATH
+ *                                    if no emitter key is specified --&gt;
  *                  &lt;param name="fileExtension" 
type="string"&gt;json&lt;/param&gt;
  *                  &lt;!-- optional; default is 'true'-- whether to copy the 
json to a local file before putting to s3 --&gt;
  *                  &lt;param name="spoolToTemp" 
type="bool"&gt;true&lt;/param&gt;
@@ -76,16 +82,16 @@ import static 
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *      &lt;/emitters&gt;
  *  &lt;/properties&gt;</pre>
  */
-public class S3Emitter extends AbstractEmitter implements Initializable {
+public class S3Emitter extends AbstractEmitter implements Initializable, 
StreamEmitter {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(S3Emitter.class);
     private String region;
     private String profile;
     private String bucket;
-    private AmazonS3 s3Client;
     private String fileExtension = "json";
     private boolean spoolToTemp = true;
-
+    private String prefix = null;
+    private AmazonS3 s3Client;
 
     /**
      * Requires the src-bucket/path/to/my/file.txt in the {@link 
TikaCoreProperties#SOURCE_PATH}.
@@ -95,26 +101,22 @@ public class S3Emitter extends AbstractEmitter implements 
Initializable {
      * @throws TikaException
      */
     @Override
-    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or 
of size 0");
         }
-        String path = metadataList.get(0)
-                .get(TikaCoreProperties.SOURCE_PATH);
-        if (path == null) {
-            throw new TikaEmitterException("Must specify a 
"+TikaCoreProperties.SOURCE_PATH.getName() +
-                    " in the metadata in order for this emitter to generate 
the output file path.");
-        }
+
         if (! spoolToTemp) {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try (Writer writer =
                          new BufferedWriter(new OutputStreamWriter(bos, 
StandardCharsets.UTF_8))) {
                 JsonMetadataList.toJson(metadataList, writer);
+            } catch (TikaException e) {
+                throw new TikaEmitterException("can't jsonify", e);
             }
             byte[] bytes = bos.toByteArray();
-            long length = bytes.length;
-            try (InputStream is = new ByteArrayInputStream(bytes)) {
-                emit(path, is, length, new Metadata());
+            try (InputStream is = TikaInputStream.get(bytes)) {
+                emit(emitKey, is, new Metadata());
             }
         } else {
             TemporaryResources tmp = new TemporaryResources();
@@ -123,10 +125,11 @@ public class S3Emitter extends AbstractEmitter implements 
Initializable {
                 try (Writer writer = Files.newBufferedWriter(tmpPath,
                         StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
                     JsonMetadataList.toJson(metadataList, writer);
+                } catch (TikaException e) {
+                    throw new TikaEmitterException("can't jsonify", e);
                 }
-                long length = Files.size(tmpPath);
-                try (InputStream is = Files.newInputStream(tmpPath)) {
-                    emit(path, is, length, new Metadata());
+                try (InputStream is = TikaInputStream.get(tmpPath)) {
+                    emit(emitKey, is, new Metadata());
                 }
             } finally {
                 tmp.close();
@@ -141,14 +144,29 @@ public class S3Emitter extends AbstractEmitter implements 
Initializable {
      * @param userMetadata this will be written to the s3 ObjectMetadata's 
userMetadata
      * @throws TikaEmitterException
      */
-    public void emit(String path, InputStream is, long length, Metadata 
userMetadata) throws TikaEmitterException {
+    @Override
+    public void emit(String path, InputStream is, Metadata userMetadata) 
throws IOException, TikaEmitterException {
+
+        if (!StringUtils.isBlank(prefix)) {
+            path = prefix + "/" + path;
+        }
 
-        if (fileExtension != null && fileExtension.length() > 0) {
+        if (! StringUtils.isBlank(fileExtension)) {
             path += "." + fileExtension;
         }
 
         LOGGER.debug("about to emit to target bucket: ({}) path:({})",
                 bucket, path);
+        long length = -1;
+        if (is instanceof TikaInputStream) {
+            if (((TikaInputStream)is).hasFile()) {
+                try {
+                    length = ((TikaInputStream) is).getLength();
+                } catch (IOException e) {
+                    throw new TikaEmitterException("exception getting length", 
e);
+                }
+            }
+        }
         ObjectMetadata objectMetadata = new ObjectMetadata();
         if (length > 0) {
             objectMetadata.setContentLength(length);
@@ -194,6 +212,16 @@ public class S3Emitter extends AbstractEmitter implements 
Initializable {
         this.bucket = bucket;
     }
 
+    @Field
+    public void setPrefix(String prefix) {
+        //strip final "/" if it exists
+        if (prefix.endsWith("/")) {
+            this.prefix = prefix.substring(0, prefix.length()-1);
+        } else {
+            this.prefix = prefix;
+        }
+    }
+
     /**
      * If you want to customize the output file's file extension.
      * Do not include the "."
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 14d0a88..2391424 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -20,24 +20,25 @@ import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import org.apache.tika.client.HttpClientUtil;
+import org.apache.tika.client.TikaClientException;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-public class SolrEmitter implements Emitter, Initializable {
+public class SolrEmitter extends AbstractEmitter implements Initializable {
 
     enum AttachmentStrategy {
         SKIP,
@@ -50,7 +51,6 @@ public class SolrEmitter implements Emitter, Initializable {
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = 
LoggerFactory.getLogger(SolrEmitter.class);
 
-    private String name = "solr";
     private AttachmentStrategy attachmentStrategy = 
AttachmentStrategy.PARENT_CHILD;
     private String url;
     private String contentField = "content";
@@ -58,23 +58,23 @@ public class SolrEmitter implements Emitter, Initializable {
     private int commitWithin = 100;
 
     @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void emit(List<Metadata> metadataList) throws IOException,
-            TikaException {
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException,
+            TikaEmitterException {
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
-        String json = jsonify(metadataList);
+        String json = jsonify(emitKey, metadataList);
         LOG.debug("emitting json:"+json);
-        
HttpClientUtil.postJson(url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), 
json);
+        try {
+            
HttpClientUtil.postJson(url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), 
json);
+        } catch (TikaClientException e) {
+            throw new TikaEmitterException("can't post", e);
+        }
     }
 
-    private String jsonify(List<Metadata> metadataList) {
+    private String jsonify(String emitKey, List<Metadata> metadataList) {
+        metadataList.get(0).set(idField, emitKey);
         if (attachmentStrategy == AttachmentStrategy.SKIP) {
             return toJsonString(jsonify(metadataList.get(0)));
         } else if (attachmentStrategy == 
AttachmentStrategy.CONCATENATE_CONTENT) {
@@ -163,11 +163,6 @@ public class SolrEmitter implements Emitter, Initializable 
{
         }
     }
 
-    @Field
-    public void setName(String name) {
-        this.name = name;
-    }
-
     /**
      * Specify the url for Solr
      * @param url
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 5f1e75d..14a091a 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -38,7 +38,6 @@ public class TestBasic {
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
         List<Metadata> metadataList = new ArrayList<>();
         Metadata m1 = new Metadata();
-        m1.set("id", "1");
         m1.set(Metadata.CONTENT_LENGTH, "314159");
         m1.set(TikaCoreProperties.TIKA_CONTENT, "the quick brown");
         m1.set(TikaCoreProperties.TITLE, "this is the first title");
@@ -54,6 +53,6 @@ public class TestBasic {
         metadataList.add(m1);
         metadataList.add(m2);
 
-        emitter.emit(metadataList);
+        emitter.emit("1", metadataList);
     }
 }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 5737eb5..12268ea 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -22,10 +22,15 @@ import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Reader;
@@ -40,11 +45,32 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
+/**
+ * Iterates through a UTF-8 CSV file. This adds all columns
+ * (except for the 'fetchKeyColumn' and 'emitKeyColumn', if specified)
+ * to the metadata object.
+ * <p>
+ *  <ul>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that column's 
value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata 
from the other columns.</li>
+ *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
+ *  </ul>
+ * <p>
+ *  <ul>
+ *      <li>If an 'emitKeyColumn' is specified, this will use that column's 
value as the emit key.</li>
+ *      <li>If an 'emitKeyColumn' is not specified, this will use the value 
from the 'fetchKeyColumn'.</li>
+ *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
+ *  </ul>
+ *
+ */
 public class CSVFetchIterator extends FetchIterator implements Initializable {
 
-    private Charset charset = StandardCharsets.UTF_8;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CSVFetchIterator.class);
+
+    private final Charset charset = StandardCharsets.UTF_8;
     private Path csvPath;
     private String fetchKeyColumn;
+    private String emitKeyColumn;
 
 
     @Field
@@ -57,6 +83,11 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
         this.fetchKeyColumn = fetchKeyColumn;
     }
 
+    @Field
+    public void setEmitKeyColumn(String emitKeyColumn) {
+        this.emitKeyColumn = emitKeyColumn;
+    }
+
     public void setCsvPath(Path csvPath) {
         this.csvPath = csvPath;
     }
@@ -64,35 +95,93 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherName = getFetcherName();
+        String emitterName = getEmitterName();
         try (Reader reader = Files.newBufferedReader(csvPath, charset)) {
             Iterable<CSVRecord> records = CSVFormat.EXCEL.parse(reader);
-            int fetchKeyIndex = -1;
             List<String> headers = new ArrayList<>();
+            FetchEmitKeyIndices fetchEmitKeyIndices = null;
             for (CSVRecord record : records) {
-                fetchKeyIndex = loadHeaders(record, headers);
+                fetchEmitKeyIndices = loadHeaders(record, headers);
                 break;
             }
-            //check that if a user set a fetchKeyColumn, but we didn't find it
-            if (fetchKeyIndex < 0 && (fetchKeyColumn != null)) {
-                throw new IllegalArgumentException("Can't find " + 
fetchKeyColumn +
-                        " in the csv. I see:"+
-                        headers);
-            }
+
+            checkFetchEmitValidity(fetcherName, emitterName, 
fetchEmitKeyIndices, headers);
+
             for (CSVRecord record : records) {
-                String fetchKey = "";
-                if (fetchKeyIndex > -1) {
-                    fetchKey = record.get(fetchKeyIndex);
+                String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
+                String emitKey = getEmitKey(fetchEmitKeyIndices, record);
+                if (StringUtils.isBlank(fetchKey) && ! 
StringUtils.isBlank(fetcherName)) {
+                    LOGGER.debug("Fetcher specified ({}), but no fetchkey was 
found in ({})",
+                            fetcherName, record);
+                }
+                if (StringUtils.isBlank(emitKey)) {
+                    throw new IOException("emitKey must not be blank in 
:"+record);
                 }
-                Metadata metadata = loadMetadata(fetchKeyIndex, headers, 
record);
-                tryToAdd(new FetchIdMetadataPair(new FetchId(fetcherName, 
fetchKey), metadata));
+                Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, 
record);
+                tryToAdd(new FetchEmitTuple(
+                        new FetchKey(fetcherName, fetchKey),
+                        new EmitKey(emitterName, emitKey), metadata));
             }
         }
     }
 
-    private Metadata loadMetadata(int fetchKeyIndex, List<String> headers, 
CSVRecord record) {
+    private void checkFetchEmitValidity(String fetcherName,
+                                        String emitterName,
+                                        FetchEmitKeyIndices 
fetchEmitKeyIndices,
+                                        List<String> headers) throws 
IOException {
+
+        if (StringUtils.isBlank(emitterName)) {
+            throw new IOException(new TikaConfigException("must specify at 
least an emitterName"));
+        }
+
+        if (StringUtils.isBlank(fetcherName) && ! 
StringUtils.isBlank(fetchKeyColumn)) {
+            throw new IOException(new TikaConfigException("If specifying a 
'fetchKeyColumn', " +
+                    "you must also specify a 'fetcherName'"));
+        }
+
+        if (StringUtils.isBlank(fetcherName)) {
+            LOGGER.debug("No fetcher specified. This will be metadata only");
+        }
+
+        //if a fetchkeycolumn is specified, make sure that it was found
+        if (! StringUtils.isBlank(fetchKeyColumn) && 
fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(new TikaConfigException("Couldn't find 
fetchKeyColumn ("+
+                    fetchKeyColumn+" in header.\n" +
+                    "These are the headers I see: " + headers));
+        }
+
+        //if an emitkeycolumn is specified, make sure that it was found
+        if (! StringUtils.isBlank(emitKeyColumn) && 
fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(new TikaConfigException("Couldn't find 
emitKeyColumn ("+
+                    emitKeyColumn+" in header.\n" +
+                    "These are the headers I see: " + headers));
+        }
+
+        if (StringUtils.isBlank(emitKeyColumn)) {
+            LOGGER.debug("No emitKeyColumn specified. " +
+                            "Will use fetchKeyColumn ({}) for both the fetch 
key and emit key",
+                    fetchKeyColumn);
+        }
+    }
+
+    private String getFetchKey(FetchEmitKeyIndices fetchEmitKeyIndices, 
CSVRecord record) {
+        if (fetchEmitKeyIndices.fetchKeyIndex > -1) {
+            return record.get(fetchEmitKeyIndices.fetchKeyIndex);
+        }
+        return StringUtils.EMPTY;
+    }
+
+    private String getEmitKey(FetchEmitKeyIndices fetchEmitKeyIndices, 
CSVRecord record) {
+        if (fetchEmitKeyIndices.emitKeyIndex > -1) {
+            return record.get(fetchEmitKeyIndices.emitKeyIndex);
+        }
+        return getFetchKey(fetchEmitKeyIndices, record);
+    }
+
+    private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, 
List<String> headers, CSVRecord record) {
         Metadata metadata = new Metadata();
         for (int i = 0; i < record.size(); i++) {
-            if (fetchKeyIndex == i) {
+            if (fetchEmitKeyIndices.shouldSkip(i)) {
                 continue;
             }
             metadata.set(headers.get(i), record.get(i));
@@ -101,23 +190,45 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
     }
 
 
-    private int loadHeaders(CSVRecord record, List<String> headers) {
+    private FetchEmitKeyIndices loadHeaders(CSVRecord record, List<String> 
headers)
+        throws IOException {
         int fetchKeyColumnIndex = -1;
+        int emitKeyColumnIndex = -1;
 
         for (int col = 0; col < record.size(); col++) {
             String header = record.get(col);
+            if (StringUtils.isBlank(header)) {
+                throw new IOException(
+                        new TikaException("Header in column (" +col +") must 
not be empty"));
+            }
             headers.add(header);
             if (header.equals(fetchKeyColumn)) {
                 fetchKeyColumnIndex = col;
+            } else if (header.equals(emitKeyColumn)) {
+                emitKeyColumnIndex = col;
             }
         }
-        return fetchKeyColumnIndex;
+        return new FetchEmitKeyIndices(fetchKeyColumnIndex, 
emitKeyColumnIndex);
     }
 
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
         super.checkInitialization(problemHandler);
-        mustNotBeEmpty("csvPath", this.csvPath.toString());
+        mustNotBeEmpty("csvPath", this.csvPath);
+    }
+
+    private static class FetchEmitKeyIndices {
+        private final int fetchKeyIndex;
+        private final int emitKeyIndex;
+
+        public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+            this.fetchKeyIndex = fetchKeyIndex;
+            this.emitKeyIndex = emitKeyIndex;
+        }
+
+        public boolean shouldSkip(int index) {
+            return fetchKeyIndex == index || emitKeyIndex == index;
+        }
     }
 }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 2a6ab52..74f3ebe 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -14,12 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.file.Path;
@@ -44,12 +41,13 @@ public class TestCSVFetchIterator {
     public void testSimple() throws Exception {
         Path p = get("test-simple.csv");
         CSVFetchIterator it = new CSVFetchIterator();
-        it.setFetcherName("fs");
+        it.setFetcherName("fsf");
+        it.setEmitterName("fse");
         it.setCsvPath(p);
         it.setFetchKeyColumn("fetchKey");
         int numConsumers = 2;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
         c.submit(it);
         List<MockFetcher> fetchers = new ArrayList<>();
@@ -70,13 +68,13 @@ public class TestCSVFetchIterator {
         }
         assertEquals(10, completed);
         for (MockFetcher f : fetchers) {
-            for (FetchIdMetadataPair pair : f.pairs) {
-                String id = pair.getMetadata().get("id");
+            for (FetchEmitTuple t : f.pairs) {
+                String id = t.getMetadata().get("id");
                 assertEquals("path/to/my/file"+id,
-                        pair.getFetchId().getFetchKey());
+                        t.getFetchKey().getKey());
                 assertEquals("project"+
                                 (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
-                        pair.getMetadata().get("project"));
+                        t.getMetadata().get("project"));
             }
         }
     }
@@ -109,20 +107,20 @@ public class TestCSVFetchIterator {
     }
 
     private static class MockFetcher implements Callable<Integer> {
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
-        private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
-        private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final List<FetchEmitTuple> pairs = new ArrayList<>();
+        private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
 
         @Override
         public Integer call() throws Exception {
             while (true) {
-                FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
-                pairs.add(p);
+                pairs.add(t);
             }
         }
     }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 057fecd..2c5ac1a 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -22,8 +22,9 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
@@ -43,12 +44,30 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
+/**
+ * Iterates through a the results from a sql call via jdbc. This adds all 
columns
+ * (except for the 'fetchKeyColumn' and 'emitKeyColumn', if specified)
+ * to the metadata object.
+ * <p>
+ *  <ul>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that column's 
value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata 
from the other columns.</li>
+ *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
+ *  </ul>
+ * <p>
+ *  <ul>
+ *      <li>An 'emitKeyColumn' must be specified</li>
+ *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
+ *  </ul>
+ *
+ */
 public class JDBCFetchIterator extends FetchIterator implements Initializable {
 
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCFetchIterator.class);
 
     private String fetchKeyColumn;
+    private String emitKeyColumn;
     private String connection;
     private String select;
 
@@ -60,6 +79,11 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
     }
 
     @Field
+    public void setEmitKeyColumn(String fetchKeyColumn) {
+        this.emitKeyColumn = fetchKeyColumn;
+    }
+
+    @Field
     public void setConnection(String connection) {
         this.connection = connection;
     }
@@ -72,7 +96,8 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherName = getFetcherName();
-        int fetchKeyIndex = -1;
+        String emitterName = getEmitterName();
+        FetchEmitKeyIndices fetchEmitKeyIndices = null;
         List<String> headers = new ArrayList<>();
         int rowCount = 0;
         LOGGER.debug("select: {}", select);
@@ -80,10 +105,11 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
             try (ResultSet rs = st.executeQuery(select)) {
                 while (rs.next()) {
                     if (headers.size() == 0) {
-                        fetchKeyIndex = loadHeaders(rs.getMetaData(), headers);
+                        fetchEmitKeyIndices = loadHeaders(rs.getMetaData(), 
headers);
+                        checkFetchEmitValidity(fetcherName, emitterName, 
fetchEmitKeyIndices, headers);
                     }
                     try {
-                        processRow(fetcherName, headers, fetchKeyIndex, rs);
+                        processRow(fetcherName, emitterName, headers, 
fetchEmitKeyIndices, rs);
                     } catch (SQLException e) {
                         LOGGER.warn("Failed to insert: "+rs, e);
                     }
@@ -104,16 +130,36 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
             }
         }
     }
+    private void checkFetchEmitValidity(String fetcherName,
+                                        String emitterName,
+                                        FetchEmitKeyIndices 
fetchEmitKeyIndices,
+                                        List<String> headers) throws 
IOException {
+
+        if (! StringUtils.isBlank(fetchKeyColumn) && 
fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(new TikaConfigException("Couldn't find 
column: "+fetchKeyColumn));
+        }
+        if (! StringUtils.isBlank(emitKeyColumn) && 
fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(new TikaConfigException("Couldn't find 
column: "+emitKeyColumn));
+        }
+    }
 
-    private void processRow(String fetcherName, List<String> headers,
-                            int fetchKeyIndex, ResultSet rs)
+    private void processRow(String fetcherName, String emitterName, 
List<String> headers,
+                            FetchEmitKeyIndices fetchEmitKeyIndices, ResultSet 
rs)
             throws SQLException, TimeoutException, InterruptedException {
         Metadata metadata = new Metadata();
-        String fetchKey = null;
+        String fetchKey = "";
+        String emitKey = "";
         for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
-            if (i == fetchKeyIndex) {
+            if (i == fetchEmitKeyIndices.fetchKeyIndex) {
                 fetchKey = getString(i, rs);
                 fetchKey = (fetchKey == null) ? "" : fetchKey;
+                LOGGER.debug("fetchKey is empty for record "+toString(rs));
+                continue;
+            }
+            if (i == fetchEmitKeyIndices.emitKeyIndex) {
+                emitKey = getString(i, rs);
+                emitKey = (emitKey == null) ? "" : emitKey;
+                LOGGER.warn("emitKey is empty for record "+toString(rs));
                 continue;
             }
             String val = getString(i, rs);
@@ -122,7 +168,21 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
             }
         }
 
-        tryToAdd(new FetchIdMetadataPair(new FetchId(fetcherName, fetchKey), 
metadata));
+        tryToAdd(new FetchEmitTuple(
+                new FetchKey(fetcherName, fetchKey),
+                new EmitKey(emitterName, emitKey),
+                metadata));
+    }
+
+    private String toString(ResultSet rs) throws SQLException {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
+            String val = rs.getString(i);
+            val = (val == null) ? "" : val;
+            val = (val.length() > 100) ? val.substring(0, 100) : val;
+            sb.append(rs.getMetaData().getColumnLabel(i)+":"+val+"\n");
+        }
+        return sb.toString();
     }
 
     private String getString(int i, ResultSet rs) throws SQLException {
@@ -135,15 +195,19 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
     }
 
 
-    private int loadHeaders(ResultSetMetaData metaData, List<String> headers) 
throws SQLException {
+    private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, 
List<String> headers) throws SQLException {
         int fetchKeyIndex = -1;
+        int emitKeyIndex = -1;
         for (int i = 1; i <= metaData.getColumnCount(); i++) {
             if (metaData.getColumnLabel(i).equalsIgnoreCase(fetchKeyColumn)) {
                 fetchKeyIndex = i;
             }
+            if (metaData.getColumnLabel(i).equalsIgnoreCase(emitKeyColumn)) {
+                emitKeyIndex = i;
+            }
             headers.add(metaData.getColumnLabel(i));
         }
-        return fetchKeyIndex;
+        return new FetchEmitKeyIndices(fetchKeyIndex, emitKeyIndex);
     }
 
     @Override
@@ -161,8 +225,31 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
         super.checkInitialization(problemHandler);
         mustNotBeEmpty("connection", this.connection);
         mustNotBeEmpty("select", this.select);
+        mustNotBeEmpty("emitterName", this.getEmitterName());
+        mustNotBeEmpty("emitKeyColumn", this.emitKeyColumn);
+
+        if (StringUtils.isBlank(getFetcherName()) && ! 
StringUtils.isBlank(fetchKeyColumn)) {
+            throw new TikaConfigException(
+                    "If you specify a 'fetchKeyColumn', you must specify a 
'fetcherName'");
+        }
+
         if (StringUtils.isEmpty(fetchKeyColumn)) {
             LOGGER.info("no fetch key column has been specified");
         }
+
+    }
+
+    private static class FetchEmitKeyIndices {
+        private final int fetchKeyIndex;
+        private final int emitKeyIndex;
+
+        public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+            this.fetchKeyIndex = fetchKeyIndex;
+            this.emitKeyIndex = emitKeyIndex;
+        }
+
+        public boolean shouldSkip(int index) {
+            return fetchKeyIndex == index || emitKeyIndex == index;
+        }
     }
 }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index fb0277b..ee3fcee 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -18,12 +18,9 @@ package org.apache.tika.pipes.fetchiterator.jdbc;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -32,22 +29,18 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -99,7 +92,7 @@ public class TestJDBCFetchIterator {
         FetchIterator fetchIterator = tk.getFetchIterator();
         ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = 
fetchIterator.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = 
fetchIterator.init(numConsumers);
         completionService.submit(fetchIterator);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
@@ -119,8 +112,8 @@ public class TestJDBCFetchIterator {
         int cnt = 0;
         Matcher m = Pattern.compile("fk(\\d+)").matcher("");
         for (MockFetcher f : fetchers) {
-            for (FetchIdMetadataPair p : f.pairs) {
-                String k = p.getFetchId().getFetchKey();
+            for (FetchEmitTuple p : f.pairs) {
+                String k = p.getFetchKey().getKey();
                 String num = "";
                 if (m.reset(k).find()) {
                     num = m.group(1);
@@ -143,9 +136,11 @@ public class TestJDBCFetchIterator {
                 "    <fetchIterators>\n" +
                 "        <fetchIterator 
class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
                 "            <params>\n" +
-                "                <param name=\"fetcherName\" 
type=\"string\">s3</param>\n" +
+                "                <param name=\"fetcherName\" 
type=\"string\">s3f</param>\n" +
+                "                <param name=\"emitterName\" 
type=\"string\">s3e</param>\n" +
                 "                <param name=\"queueSize\" 
type=\"int\">57</param>\n" +
                 "                <param name=\"fetchKeyColumn\" 
type=\"string\">my_fetchkey</param>\n" +
+                "                <param name=\"emitKeyColumn\" 
type=\"string\">my_fetchkey</param>\n" +
                 "                <param name=\"select\" type=\"string\">" +
                 "select id as my_id, project as my_project, fetchKey as 
my_fetchKey from fetchkeys</param>\n" +
                 "                <param name=\"connection\" 
type=\"string\">jdbc:h2:file:"+
@@ -158,20 +153,20 @@ public class TestJDBCFetchIterator {
     }
 
     private static class MockFetcher implements Callable<Integer> {
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
-        private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
-        private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final List<FetchEmitTuple> pairs = new ArrayList<>();
+        private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
 
         @Override
         public Integer call() throws Exception {
             while (true) {
-                FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
-                pairs.add(p);
+                pairs.add(t);
             }
         }
     }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index bd85658..2d575e9 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -20,7 +20,6 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
@@ -28,8 +27,9 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,11 +97,13 @@ public class S3FetchIterator extends FetchIterator 
implements Initializable {
         long start = System.currentTimeMillis();
         int count = 0;
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, 
s3PathPrefix)) {
+
             long elapsed = System.currentTimeMillis() - start;
             LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
                     elapsed);
-            tryToAdd( new FetchIdMetadataPair(
-                    new FetchId(fetcherName, summary.getKey()),
+            tryToAdd(new FetchEmitTuple(
+                    new FetchKey(fetcherName, summary.getKey()),
+                    new EmitKey(fetcherName, summary.getKey()),
                     new Metadata()));
             count++;
         }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 1984577..51d20ac 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 package org.apache.tika.pipes.fetchiterator.s3;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -52,7 +46,7 @@ public class TestS3FetchIterator {
         it.setRegion("");//select one
         it.initialize(Collections.EMPTY_MAP);
         int numConsumers = 6;
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
 
         ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
@@ -78,20 +72,20 @@ public class TestS3FetchIterator {
     }
 
     private static class MockFetcher implements Callable<Integer> {
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
-        private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
-        private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final List<FetchEmitTuple> pairs = new ArrayList<>();
+        private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
 
         @Override
         public Integer call() throws Exception {
             while (true) {
-                FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
-                pairs.add(p);
+                pairs.add(t);
             }
         }
     }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 05423ce..7df2bb4 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -25,17 +25,15 @@ import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
 import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -94,7 +92,7 @@ public class PipeIntegrationTests {
         int numConsumers = 1;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
         completionService.submit(it);
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new FSFetcherEmitter(
@@ -118,7 +116,7 @@ public class PipeIntegrationTests {
         int numConsumers = 20;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
         completionService.submit(it);
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new S3FetcherEmitter(
@@ -149,9 +147,9 @@ public class PipeIntegrationTests {
 
         private final Fetcher fetcher;
         private final Emitter emitter;
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        FSFetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
                 fetcher, Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
@@ -162,24 +160,24 @@ public class PipeIntegrationTests {
         public Integer call() throws Exception {
 
             while (true) {
-                FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
-                if (p == null) {
+                FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
+                if (t == null) {
                     throw new TimeoutException("");
                 }
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
-                process(p);
+                process(t);
             }
         }
 
-        private void process(FetchIdMetadataPair p) throws IOException, 
TikaException {
-            Path targ = OUTDIR.resolve(p.getFetchId().getFetchKey());
+        private void process(FetchEmitTuple t) throws IOException, 
TikaException {
+            Path targ = OUTDIR.resolve(t.getFetchKey().getKey());
             if (Files.isRegularFile(targ)) {
                 return;
             }
-            try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(), 
p.getMetadata())) {
-                System.out.println(counter.getAndIncrement() + " : "+p );
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), 
t.getMetadata())) {
+                System.out.println(counter.getAndIncrement() + " : "+t );
                 Files.createDirectories(targ.getParent());
                 Files.copy(is, targ);
             }
@@ -191,9 +189,9 @@ public class PipeIntegrationTests {
 
         private final Fetcher fetcher;
         private final S3Emitter emitter;
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        S3FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
                 fetcher, S3Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
@@ -204,28 +202,23 @@ public class PipeIntegrationTests {
         public Integer call() throws Exception {
 
             while (true) {
-                FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
-                if (p == null) {
+                FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
+                if (t == null) {
                     throw new TimeoutException("");
                 }
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
-                process(p);
+                process(t);
             }
         }
 
-        private void process(FetchIdMetadataPair p) throws IOException, 
TikaException {
+        private void process(FetchEmitTuple t) throws IOException, 
TikaException {
             Metadata userMetadata = new Metadata();
             userMetadata.set("project", "my-project");
 
-            try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(), 
p.getMetadata())) {
-                long length = -1;
-                if (is instanceof TikaInputStream &&
-                        ((TikaInputStream) is).hasFile()) {
-                    length = ((TikaInputStream)is).getLength();
-                }
-                emitter.emit(p.getFetchId().getFetchKey(), is, length, 
userMetadata);
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), 
t.getMetadata())) {
+                emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
             }
         }
     }
diff --git 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index d7ebcf0..6e0e26e 100644
--- 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -24,7 +24,7 @@ import com.google.gson.JsonPrimitive;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -57,19 +57,20 @@ public class TikaClient {
 
     }*/
 
-    public TikaEmitterResult parse(FetchId fetchId, Metadata metadata, String 
emitter)
+    public TikaEmitterResult parse(FetchEmitTuple fetchEmit, Metadata metadata)
             throws IOException, TikaException {
         TikaHttpClient client = getHttpClient();
-        String jsonRequest = jsonifyRequest(fetchId, metadata, emitter);
+        String jsonRequest = jsonifyRequest(fetchEmit, metadata);
         return client.postJson(jsonRequest);
 
     }
 
-    private String jsonifyRequest(FetchId fetchId, Metadata metadata, String 
emitter) {
+    private String jsonifyRequest(FetchEmitTuple fetchEmit, Metadata metadata) 
{
         JsonObject root = new JsonObject();
-        root.add("fetcherName", new JsonPrimitive(fetchId.getFetcherName()));
-        root.add("fetchKey", new JsonPrimitive(fetchId.getFetchKey()));
-        root.add("emitter", new JsonPrimitive(emitter));
+        root.add("fetcher", new 
JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
+        root.add("fetchKey", new 
JsonPrimitive(fetchEmit.getFetchKey().getKey()));
+        root.add("emitter", new 
JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
+        root.add("emitKey", new 
JsonPrimitive(fetchEmit.getEmitKey().getEmitKey()));
         if (metadata.size() > 0) {
             JsonObject m = new JsonObject();
             for (String n : metadata.names()) {
diff --git 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 2ca9f97..1274d1f 100644
--- 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -18,9 +18,8 @@ package org.apache.tika.server.client;
 
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
-import org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -53,21 +52,18 @@ public class TikaClientCLI {
         //TODO -- add an actual commandline
         Path tikaConfigPath = Paths.get(args[0]);
         List<String> tikaServerUrls = Arrays.asList(args[1].split(","));
-        String fetcherString = args[2];
-
         TikaClientCLI cli = new TikaClientCLI();
-        cli.execute(tikaConfigPath, tikaServerUrls, fetcherString);
+        cli.execute(tikaConfigPath, tikaServerUrls);
     }
 
-    private void execute(Path tikaConfigPath, List<String> tikaServerUrls, 
String fetcherString)
+    private void execute(Path tikaConfigPath, List<String> tikaServerUrls)
             throws TikaException, IOException, SAXException {
         TikaConfig config = new TikaConfig(tikaConfigPath);
 
         ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads+1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(executorService);
-        //TODO: fix this!
         final FetchIterator fetchIterator = config.getFetchIterator();
-        ArrayBlockingQueue<FetchIdMetadataPair> queue = 
fetchIterator.init(numThreads);
+        final ArrayBlockingQueue<FetchEmitTuple> queue = 
fetchIterator.init(numThreads);
 
         completionService.submit(fetchIterator);
         if (tikaServerUrls.size() == numThreads) {
@@ -75,12 +71,12 @@ public class TikaClientCLI {
             for (int i = 0; i < numThreads; i++) {
                 TikaClient client = TikaClient.get(config,
                         Collections.singletonList(tikaServerUrls.get(i)));
-                completionService.submit(new FetchWorker(queue, client, 
fetcherString));
+                completionService.submit(new FetchWorker(queue, client));
             }
         } else {
             for (int i = 0; i < numThreads; i++) {
                 TikaClient client = TikaClient.get(config, tikaServerUrls);
-                completionService.submit(new FetchWorker(queue, client, 
fetcherString));
+                completionService.submit(new FetchWorker(queue, client));
             }
         }
 
@@ -114,14 +110,11 @@ public class TikaClientCLI {
     }
 
     private class FetchWorker implements Callable<Integer> {
-        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final TikaClient client;
-        private final String emitterString;
-        public FetchWorker(ArrayBlockingQueue<FetchIdMetadataPair> queue, 
TikaClient client,
-                           String emitterString) {
+        public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, 
TikaClient client) {
             this.queue = queue;
             this.client = client;
-            this.emitterString = emitterString;
         }
 
         @Override
@@ -129,20 +122,20 @@ public class TikaClientCLI {
 
             while (true) {
 
-                FetchIdMetadataPair p = queue.poll(maxWaitMs, 
TimeUnit.MILLISECONDS);
-                if (p == null) {
+                FetchEmitTuple t = queue.poll(maxWaitMs, 
TimeUnit.MILLISECONDS);
+                if (t == null) {
                     throw new TimeoutException("exceeded maxWaitMs");
                 }
-                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
                 try {
-                    LOGGER.debug("about to parse: {}", p.getFetchId());
-                    client.parse(p.getFetchId(), p.getMetadata(), 
emitterString);
+                    LOGGER.debug("about to parse: {}", t.getFetchKey());
+                    client.parse(t, t.getMetadata());
                 } catch (IOException e) {
-                    LOGGER.warn(p.getFetchId().toString(), e);
+                    LOGGER.warn(t.getFetchKey().toString(), e);
                 } catch (TikaException e) {
-                    LOGGER.warn(p.getFetchId().toString(), e);
+                    LOGGER.warn(t.getFetchKey().toString(), e);
                 }
             }
         }
diff --git 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
new file mode 100644
index 0000000..7034c89
--- /dev/null
+++ 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
@@ -0,0 +1,85 @@
+package org.apache.tika.server.client;
+
+import org.apache.tika.config.Param;
+import org.apache.tika.config.ServiceLoader;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.mime.MimeTypeException;
+import org.apache.tika.mime.MimeTypes;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TikaClientConfig extends TikaConfig {
+    public TikaClientConfig(String file) throws TikaException, IOException, 
SAXException {
+        super(file);
+    }
+
+    public TikaClientConfig(Path path) throws TikaException, IOException, 
SAXException {
+        super(path);
+    }
+
+    public TikaClientConfig(Path path, ServiceLoader loader) throws 
TikaException, IOException, SAXException {
+        super(path, loader);
+    }
+
+    public TikaClientConfig(File file) throws TikaException, IOException, 
SAXException {
+        super(file);
+    }
+
+    public TikaClientConfig(File file, ServiceLoader loader) throws 
TikaException, IOException, SAXException {
+        super(file, loader);
+    }
+
+    public TikaClientConfig(URL url) throws TikaException, IOException, 
SAXException {
+        super(url);
+    }
+
+    public TikaClientConfig(URL url, ClassLoader loader) throws TikaException, 
IOException, SAXException {
+        super(url, loader);
+    }
+
+    public TikaClientConfig(URL url, ServiceLoader loader) throws 
TikaException, IOException, SAXException {
+        super(url, loader);
+    }
+
+    public TikaClientConfig(InputStream stream) throws TikaException, 
IOException, SAXException {
+        super(stream);
+    }
+
+    public TikaClientConfig(Document document) throws TikaException, 
IOException {
+        super(document);
+    }
+
+    public TikaClientConfig(Document document, ServiceLoader loader) throws 
TikaException, IOException {
+        super(document, loader);
+    }
+
+    public TikaClientConfig(Element element) throws TikaException, IOException 
{
+        super(element);
+    }
+
+    public TikaClientConfig(Element element, ClassLoader loader) throws 
TikaException, IOException {
+        super(element, loader);
+    }
+
+    public TikaClientConfig(ClassLoader loader) throws MimeTypeException, 
IOException {
+        super(loader);
+    }
+
+    public TikaClientConfig() throws TikaException, IOException {
+    }
+
+}
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 9752608..9689ba8 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -27,6 +27,7 @@ import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.utils.ExceptionUtils;
+import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,22 +56,23 @@ public class EmitterResource {
     private static final String EMITTER_PARAM = "emitter";
     private static final String FETCHER_NAME_ABBREV = "fn";
     private static final String FETCH_KEY_ABBREV = "fk";
+    private static final String EMIT_KEY_ABBREV = "ek";
 
     /**
      * key that is safe to pass through http header.
      * The user _must_ specify this for the fsemitter if calling 'put'
      */
-    public static final String PATH_KEY_FOR_HTTP_HEADER = 
TikaCoreProperties.SOURCE_PATH.getName().replaceAll(":", "-");
+    public static final String EMIT_KEY_FOR_HTTP_HEADER = "emit-key";
     private static final Logger LOG = 
LoggerFactory.getLogger(EmitterResource.class);
 
 
     /**
-     * @param is input stream is ignored in 'get'
+     * @param is          input stream is ignored in 'get'
      * @param httpHeaders
      * @param info
      * @param emitterName
      * @param fetcherName specify the fetcherName in the url's query section
-     * @param fetchKey specify the fetch key in the url's query section
+     * @param fetchKey    specify the fetch key in the url's query section
      * @return
      * @throws Exception
      */
@@ -78,23 +80,22 @@ public class EmitterResource {
     @Produces("application/json")
     @Path("{" + EMITTER_PARAM + " : (\\w+)?}")
     public Map<String, String> getRmeta(InputStream is, @Context HttpHeaders 
httpHeaders,
-                                           @Context UriInfo info,
-                                           @PathParam(EMITTER_PARAM) String 
emitterName,
-                                           @QueryParam(FETCHER_NAME_ABBREV) 
String fetcherName,
-                                                 @QueryParam(FETCH_KEY_ABBREV) 
String fetchKey) throws Exception {
+                                        @Context UriInfo info,
+                                        @PathParam(EMITTER_PARAM) String 
emitterName,
+                                        @QueryParam(FETCHER_NAME_ABBREV) 
String fetcherName,
+                                        @QueryParam(FETCH_KEY_ABBREV) String 
fetchKey,
+                                        @QueryParam(EMIT_KEY_ABBREV) String 
emitKey) throws Exception {
         Metadata metadata = new Metadata();
         Fetcher fetcher = 
TikaResource.getConfig().getFetcherManager().getFetcher(fetcherName);
         List<Metadata> metadataList;
         try (InputStream fetchedIs = fetcher.fetch(fetchKey, metadata)) {
-            for (String n : metadata.names()) {
-                System.out.println(n + " ; "+metadata.get(n));
-            }
             metadataList =
                     RecursiveMetadataResource.parseMetadata(fetchedIs,
                             metadata,
                             httpHeaders.getRequestHeaders(), info, "text");
         }
-        return emit(emitterName, metadataList);
+        emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
+        return emit(emitterName, emitKey, metadataList);
     }
 
     /**
@@ -106,7 +107,8 @@ public class EmitterResource {
      * {@link TikaCoreProperties#TIKA_CONTENT}
      * <p>
      * Must specify an emitter in the path, e.g. /emit/solr
-     * @param info uri info
+     *
+     * @param info        uri info
      * @param emitterName which emitter to use; emitters must be configured in
      *                    the TikaConfig file.
      * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
@@ -116,19 +118,18 @@ public class EmitterResource {
     @Produces("application/json")
     @Path("{" + EMITTER_PARAM + " : (\\w+)?}")
     public Map<String, String> putRmeta(InputStream is,
-                                           @Context HttpHeaders httpHeaders,
-                                           @Context UriInfo info,
-                                           @PathParam(EMITTER_PARAM) String 
emitterName
+                                        @Context HttpHeaders httpHeaders,
+                                        @Context UriInfo info,
+                                        @PathParam(EMITTER_PARAM) String 
emitterName
     ) throws Exception {
 
         Metadata metadata = new Metadata();
-        String path = httpHeaders.getHeaderString(PATH_KEY_FOR_HTTP_HEADER);
-        metadata.set(TikaCoreProperties.SOURCE_PATH, path);
+        String emitKey = httpHeaders.getHeaderString(EMIT_KEY_FOR_HTTP_HEADER);
         List<Metadata> metadataList =
                 RecursiveMetadataResource.parseMetadata(is,
                         metadata,
                         httpHeaders.getRequestHeaders(), info, "text");
-        return emit(emitterName, metadataList);
+        return emit(emitterName, emitKey, metadataList);
     }
 
     /**
@@ -142,6 +143,7 @@ public class EmitterResource {
      * {@link TikaCoreProperties#TIKA_CONTENT}
      * <p>
      * Must specify a fetcherString and an emitter in the posted json.
+     *
      * @param info uri info
      * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
      * @throws Exception
@@ -149,16 +151,18 @@ public class EmitterResource {
     @POST
     @Produces("application/json")
     public Map<String, String> postRmeta(InputStream is,
-                                @Context HttpHeaders httpHeaders,
-                                @Context UriInfo info
-                                ) throws Exception {
-        JsonElement root = null;
+                                         @Context HttpHeaders httpHeaders,
+                                         @Context UriInfo info
+    ) throws Exception {
+        JsonObject root = null;
         try (Reader reader = new InputStreamReader(is, 
StandardCharsets.UTF_8)) {
-            root = JsonParser.parseReader(reader);
+            root = JsonParser.parseReader(reader).getAsJsonObject();
         }
-        String fetcherName = 
root.getAsJsonObject().get("fetcherName").getAsString();
-        String fetchKey = root.getAsJsonObject().get("fetchKey").getAsString();
-        String emitterName = 
root.getAsJsonObject().get("emitter").getAsString();
+        String fetcherName = root.get("fetcher").getAsString();
+        String fetchKey = root.get("fetchKey").getAsString();
+        String emitKey = (root.has("emitKey")) ?
+                root.get("emitKey").getAsString() : fetchKey;
+        String emitterName = root.get("emitter").getAsString();
         Metadata metadata = new Metadata();
 
 
@@ -181,12 +185,12 @@ public class EmitterResource {
             LOG.debug("post parse/pre emit metadata {}: {}",
                     n, metadataList.get(0).get(n));
         }
-        return emit(emitterName, metadataList);
+        return emit(emitterName, emitKey, metadataList);
     }
 
     private void injectUserMetadata(Metadata metadata, JsonObject root) {
-        if (root.getAsJsonObject().has("metadata")) {
-            JsonObject meta = 
root.getAsJsonObject().getAsJsonObject("metadata");
+        if (root.has("metadata")) {
+            JsonObject meta = root.getAsJsonObject("metadata");
             for (String k : meta.keySet()) {
                 JsonElement val = meta.get(k);
                 if (val.isJsonArray()) {
@@ -209,13 +213,13 @@ public class EmitterResource {
         return statusMap;
     }
 
-    private Map<String, String> emit(String emitterName, List<Metadata> 
metadataList) throws TikaException {
+    private Map<String, String> emit(String emitterName, String emitKey, 
List<Metadata> metadataList) throws TikaException {
         Emitter emitter = 
TikaResource.getConfig().getEmitterManager().getEmitter(emitterName);
         String status = "ok";
         String exceptionMsg = "";
         try {
-            emitter.emit(metadataList);
-        } catch (IOException|TikaEmitterException e) {
+            emitter.emit(emitKey, metadataList);
+        } catch (IOException | TikaEmitterException e) {
             LOG.warn("problem with emitting", e);
             status = "emitter_exception";
             exceptionMsg = ExceptionUtils.getStackTrace(e);
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
index c3f5e31..31a3cd0 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
@@ -168,7 +168,7 @@ public class TikaEmitterTest extends CXFTestBase {
     public void testPost() throws Exception {
 
         JsonObject root = new JsonObject();
-        root.add("fetcherName", new JsonPrimitive("fsf"));
+        root.add("fetcher", new JsonPrimitive("fsf"));
         root.add("fetchKey", new JsonPrimitive("hello_world.xml"));
         root.add("emitter", new JsonPrimitive("fse"));
         JsonObject userMetadata = new JsonObject();
@@ -209,7 +209,7 @@ public class TikaEmitterTest extends CXFTestBase {
     public void testPut() throws Exception {
 
         String getUrl = endPoint+EMITTER_PATH_AND_FS;
-        String metaPathKey = EmitterResource.PATH_KEY_FOR_HTTP_HEADER;
+        String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
 
         Response response = WebClient
                 .create(getUrl)
@@ -237,7 +237,7 @@ public class TikaEmitterTest extends CXFTestBase {
     public void testPostNPE() throws Exception {
 
         JsonObject root = new JsonObject();
-        root.add("fetcherName", new JsonPrimitive("fsf"));
+        root.add("fetcher", new JsonPrimitive("fsf"));
         root.add("fetchKey", new JsonPrimitive("null_pointer.xml"));
         root.add("emitter", new JsonPrimitive("fse"));
         JsonObject userMetadata = new JsonObject();
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 65e0f57..554aee1 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -267,7 +267,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
 
     private String getJsonString(String fileName) {
         JsonObject root = new JsonObject();
-        root.add("fetcherName", new JsonPrimitive(FETCHER_NAME));
+        root.add("fetcher", new JsonPrimitive(FETCHER_NAME));
         root.add("fetchKey", new JsonPrimitive(fileName));
         root.add("emitter", new JsonPrimitive(EMITTER_NAME));
         return GSON.toJson(root);

Reply via email to