This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 112c689  TIKA-3283 -- add s3 emitter
112c689 is described below

commit 112c689b9fff43aa881fa504abab5642b658a090
Author: tballison <[email protected]>
AuthorDate: Wed Jan 27 13:27:59 2021 -0500

    TIKA-3283 -- add s3 emitter
---
 .../main/java/org/apache/tika/config/Param.java    |   9 +-
 .../java/org/apache/tika/config/TikaConfig.java    |   7 +-
 .../apache/tika/pipes/emitter/AbstractEmitter.java |   1 +
 .../org/apache/tika/pipes/emitter/Emitter.java     |   1 -
 .../tika/pipes/emitter/TikaEmitterException.java   |   4 +
 .../tika/pipes/fetchiterator/FetchIterator.java    |   7 -
 .../fetchiterator/FileSystemFetchIteratorTest.java |   4 +-
 tika-pipes/tika-emitters/pom.xml                   |   1 +
 .../{ => pipes}/emitter/fs/FileSystemEmitter.java  |  42 ++--
 .../tika-emitter-s3}/pom.xml                       |  63 ++++--
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    | 224 +++++++++++++++++++++
 .../tika/{ => pipes}/emitter/solr/SolrEmitter.java |   2 +-
 .../tika/{ => pipes}/emitter/solr/TestBasic.java   |   2 +-
 .../test/resources/tika-config-simple-emitter.xml  |   4 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |   2 +
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |   2 +
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |   6 +-
 tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml   |   2 +-
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  31 ++-
 .../src/test/resources/tika-config-s3.xml          |   2 +-
 tika-pipes/tika-pipes-integration-tests/pom.xml    |  36 ++--
 .../apache/tika/pipes/PipeIntegrationTests.java    |  80 +++++++-
 .../src/test/resources/tika-config-s3Tos3.xml      |  52 +++++
 .../resources/tika-config-simple-fs-emitter.xml    |   6 +-
 .../apache/tika/server/core/TikaEmitterTest.java   |   2 +-
 .../core/TikaServerEmitterIntegrationTest.java     |   2 +-
 26 files changed, 497 insertions(+), 97 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/Param.java 
b/tika-core/src/main/java/org/apache/tika/config/Param.java
index 74123ac..8e185f0 100644
--- a/tika-core/src/main/java/org/apache/tika/config/Param.java
+++ b/tika-core/src/main/java/org/apache/tika/config/Param.java
@@ -233,8 +233,13 @@ public class Param<T> implements Serializable {
         if (List.class.isAssignableFrom(ret.type)) {
             loadList(ret, node);
         } else {
-            ret.actualValue = getTypedValue(ret.type, value.getTextContent());
-            ret.valueStrings.add(value.getTextContent());
+            //allow the empty string
+            String textContent = "";
+            if (value != null) {
+                textContent = value.getTextContent();
+            }
+            ret.actualValue = getTypedValue(ret.type, textContent);
+            ret.valueStrings.add(textContent);
         }
         return ret;
     }
diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java 
b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
index f852c57..bd4c389 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
@@ -1641,5 +1641,10 @@ public class TikaConfig {
         }
     }
 
-
+    public static void mustNotBeEmpty(String paramName, String paramValue) 
throws TikaConfigException {
+        if (paramValue == null || paramValue.trim().equals("")) {
+            throw new IllegalArgumentException("parameter '"+
+                    paramName + "' must be set in the config file");
+        }
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 0aa9e7f..1117537 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -26,6 +26,7 @@ public abstract class AbstractEmitter implements Emitter {
     public void setName(String name) {
         this.name = name;
     }
+
     @Override
     public String getName() {
         return name;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 8f31311..445000d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -21,7 +21,6 @@ import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 public interface Emitter {
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
index 00a5197..f74a6e5 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
@@ -22,4 +22,8 @@ public class TikaEmitterException extends TikaException {
     public TikaEmitterException(String msg) {
         super(msg);
     }
+
+    public TikaEmitterException(String msg, Throwable t) {
+        super(msg, t);
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index b11615f..d3528ee 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -125,11 +125,4 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
         //no-op
     }
 
-    protected static void mustNotBeEmpty(String paramName, String paramValue) {
-        if (paramValue == null || paramValue.trim().equals("")) {
-            throw new IllegalArgumentException("parameter '"+
-                    paramName + "' must be set in the config file");
-        }
-    }
-
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
 
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index f23489a..1560166 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -52,7 +52,7 @@ public class FileSystemFetchIteratorTest {
 
         String fetcherName = "fs";
         ExecutorService es = Executors.newFixedThreadPool(1);
-        ExecutorCompletionService cs = new ExecutorCompletionService(es);
+        ExecutorCompletionService<Integer> cs = new 
ExecutorCompletionService<>(es);
         FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
         it.setQueueSize(20000);
         ArrayBlockingQueue<FetchIdMetadataPair> q = it.init(1);
@@ -60,7 +60,7 @@ public class FileSystemFetchIteratorTest {
         cs.submit(it);
 
 
-        Future f = cs.take();
+        Future<Integer> f = cs.take();
         f.get();
 
         Set<String> iteratorSet = new HashSet<>();
diff --git a/tika-pipes/tika-emitters/pom.xml b/tika-pipes/tika-emitters/pom.xml
index bfb55c8..28b23e4 100644
--- a/tika-pipes/tika-emitters/pom.xml
+++ b/tika-pipes/tika-emitters/pom.xml
@@ -35,6 +35,7 @@
 
     <modules>
         <module>tika-emitter-fs</module>
+        <module>tika-emitter-s3</module>
         <module>tika-emitter-solr</module>
     </modules>
 
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
similarity index 74%
rename from 
tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
rename to 
tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index e42b57c..7ebe5bb 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.emitter.fs;
+package org.apache.tika.pipes.emitter.fs;
 
 import org.apache.tika.config.Field;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.exception.TikaException;
@@ -34,16 +35,33 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
-public class FileSystemEmitter implements Emitter {
+/**
+ * Emitter to write to a file system.
+ *
+ * This calculates the path to write to based on the {@link #basePath}
+ * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value.
+ *
+ * <pre class="prettyprint">
+ *  &lt;properties&gt;
+ *      &lt;emitters&gt;
+ *          &lt;emitter 
class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter&gt;
+ *              &lt;params&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="name" type="string"&gt;fs&lt;/param&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="basePath" 
type="string"&gt;/path/to/output&lt;/param&gt;
+ *                  &lt;!-- optional; default is 'json' --&gt;
+ *                  &lt;param name="fileExtension" 
type="string"&gt;json&lt;/param&gt;
+ *              &lt;/params&gt;
+ *          &lt;/emitter&gt;
+ *      &lt;/emitters&gt;
+ *  &lt;/properties&gt;</pre>
+ */
+public class FileSystemEmitter extends AbstractEmitter {
 
-    private String name = "fs";
     private Path basePath = null;
     private String fileExtension = "json";
 
-    @Override
-    public String getName() {
-        return name;
-    }
 
     @Override
     public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
@@ -89,14 +107,4 @@ public class FileSystemEmitter implements Emitter {
     public void setFileExtension(String fileExtension) {
         this.fileExtension = fileExtension;
     }
-
-    /**
-     * Set this so to uniquely identify this emitter if
-     * there might be others available. The default is "fs"
-     * @param name
-     */
-    @Field
-    public void setName(String name) {
-        this.name = name;
-    }
 }
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml 
b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
similarity index 51%
copy from tika-pipes/tika-pipes-integration-tests/pom.xml
copy to tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index ccb138c..61f8c77 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -21,44 +21,65 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>tika-pipes</artifactId>
+        <artifactId>tika-emitters</artifactId>
         <groupId>org.apache.tika</groupId>
         <version>2.0.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>tika-pipes-integration-tests</artifactId>
+    <artifactId>tika-emitter-s3</artifactId>
 
     <dependencies>
+        <!-- should serialization be provided or bundled? -->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-serialization</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>tika-core</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>${aws.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>${commons.logging.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
         </dependency>
-             <dependency>
-                  <groupId>${project.groupId}</groupId>
-                  <artifactId>tika-fetcher-s3</artifactId>
-                  <version>${project.version}</version>
-                  <scope>test</scope>
-              </dependency>
-              <dependency>
-                  <groupId>${project.groupId}</groupId>
-                  <artifactId>tika-fetch-iterator-s3</artifactId>
-                  <version>${project.version}</version>
-                  <scope>test</scope>
-              </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-</project>
+</project>
\ No newline at end of file
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
new file mode 100644
index 0000000..bec5e1a
--- /dev/null
+++ 
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.s3;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.metadata.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+/**
+ * Emits to existing s3 bucket
+ * <pre class="prettyprint">
+ *  &lt;properties&gt;
+ *      &lt;emitters&gt;
+ *          &lt;emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter&gt;
+ *              &lt;params&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="name" type="string"&gt;s3e&lt;/param&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="region" 
type="string"&gt;us-east-1&lt;/param&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="profile" 
type="string"&gt;my-profile&lt;/param&gt;
+ *                  &lt;!-- required --&gt;
+ *                  &lt;param name="bucket" 
type="string"&gt;my-bucket&lt;/param&gt;
+ *                  &lt;!-- optional; default is 'json' --&gt;
+ *                  &lt;param name="fileExtension" 
type="string"&gt;json&lt;/param&gt;
+ *                  &lt;!-- optional; default is 'true'-- whether to copy the 
json to a local file before putting to s3 --&gt;
+ *                  &lt;param name="spoolToTemp" 
type="bool"&gt;true&lt;/param&gt;
+ *              &lt;/params&gt;
+ *          &lt;/emitter&gt;
+ *      &lt;/emitters&gt;
+ *  &lt;/properties&gt;</pre>
+ */
+public class S3Emitter extends AbstractEmitter implements Initializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3Emitter.class);
+    private String region;
+    private String profile;
+    private String bucket;
+    private AmazonS3 s3Client;
+    private String fileExtension = "json";
+    private boolean spoolToTemp = true;
+
+
+    /**
+     * Requires the src-bucket/path/to/my/file.txt in the {@link 
TikaCoreProperties#SOURCE_PATH}.
+     *
+     * @param metadataList
+     * @throws IOException
+     * @throws TikaException
+     */
+    @Override
+    public void emit(List<Metadata> metadataList) throws IOException, 
TikaException {
+        if (metadataList == null || metadataList.size() == 0) {
+            throw new TikaEmitterException("metadata list must not be null or 
of size 0");
+        }
+        String path = metadataList.get(0)
+                .get(TikaCoreProperties.SOURCE_PATH);
+        if (path == null) {
+            throw new TikaEmitterException("Must specify a 
"+TikaCoreProperties.SOURCE_PATH.getName() +
+                    " in the metadata in order for this emitter to generate 
the output file path.");
+        }
+        if (! spoolToTemp) {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try (Writer writer =
+                         new BufferedWriter(new OutputStreamWriter(bos, 
StandardCharsets.UTF_8))) {
+                JsonMetadataList.toJson(metadataList, writer);
+            }
+            byte[] bytes = bos.toByteArray();
+            long length = bytes.length;
+            try (InputStream is = new ByteArrayInputStream(bytes)) {
+                emit(path, is, length, new Metadata());
+            }
+        } else {
+            TemporaryResources tmp = new TemporaryResources();
+            try {
+                Path tmpPath = tmp.createTempFile();
+                try (Writer writer = Files.newBufferedWriter(tmpPath,
+                        StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
+                    JsonMetadataList.toJson(metadataList, writer);
+                }
+                long length = Files.size(tmpPath);
+                try (InputStream is = Files.newInputStream(tmpPath)) {
+                    emit(path, is, length, new Metadata());
+                }
+            } finally {
+                tmp.close();
+            }
+        }
+    }
+
+    /**
+     *
+     * @param path -- object path, not including the bucket
+     * @param is inputStream to copy
+     * @param userMetadata this will be written to the s3 ObjectMetadata's 
userMetadata
+     * @throws TikaEmitterException
+     */
+    public void emit(String path, InputStream is, long length, Metadata 
userMetadata) throws TikaEmitterException {
+
+        if (fileExtension != null && fileExtension.length() > 0) {
+            path += "." + fileExtension;
+        }
+
+        LOGGER.debug("about to emit to target bucket: ({}) path:({})",
+                bucket, path);
+        ObjectMetadata objectMetadata = new ObjectMetadata();
+        if (length > 0) {
+            objectMetadata.setContentLength(length);
+        }
+        for (String n : userMetadata.names()) {
+            String[] vals = userMetadata.getValues(n);
+            if (vals.length > 1) {
+                LOGGER.warn("Can only write the first value for key {}. I see 
{} values.",
+                        n, vals.length);
+            }
+            objectMetadata.addUserMetadata(n, vals[0]);
+        }
+        try {
+            s3Client.putObject(bucket, path, is, objectMetadata);
+        } catch (SdkClientException e) {
+            throw new TikaEmitterException("problem writing s3object", e);
+        }
+    }
+
+    /**
+     * Whether or not to spool the metadatalist to a tmp file before putting 
object.
+     * Default: <code>true</code>.  If this is set to <code>false</code>,
+     * this emitter writes the json object to memory and then puts that into 
s3.
+     * @param spoolToTemp
+     */
+    @Field
+    public void setSpoolToTemp(boolean spoolToTemp) {
+        this.spoolToTemp = spoolToTemp;
+    }
+
+    @Field
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    @Field
+    public void setProfile(String profile) {
+        this.profile = profile;
+    }
+
+    @Field
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    /**
+     * If you want to customize the output file's file extension.
+     * Do not include the "."
+     * @param fileExtension
+     */
+    @Field
+    public void setFileExtension(String fileExtension) {
+        this.fileExtension = fileExtension;
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
+        //params have already been set
+        //ignore them
+        s3Client = AmazonS3ClientBuilder.standard()
+                .withRegion(region)
+                .withCredentials(new ProfileCredentialsProvider(profile))
+                .build();
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
+        mustNotBeEmpty("bucket", this.bucket);
+        mustNotBeEmpty("profile", this.profile);
+        mustNotBeEmpty("region", this.region);
+    }
+
+}
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
similarity index 99%
rename from 
tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
rename to 
tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index df78bb7..14d0a88 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.emitter.solr;
+package org.apache.tika.pipes.emitter.solr;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
similarity index 98%
rename from 
tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
rename to 
tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 62b8c87..5f1e75d 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.emitter.solr;
+package org.apache.tika.pipes.emitter.solr;
 
 
 import org.apache.tika.config.TikaConfig;
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
index 18c65c6..644f0a4 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
@@ -32,7 +32,7 @@
         </metadataFilter>
     </metadataFilters>
     <emitters>
-        <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+        <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
             <params>
                 <param name="name" type="string">solr1</param>
                 <param name="url" 
type="string">http://localhost:8983/solr/tika-test</param>
@@ -41,7 +41,7 @@
                 <param name="commitWithin" type="int">10</param>
             </params>
         </emitter>
-        <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+        <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
             <params>
                 <param name="name" type="string">solr2</param>
                 <param name="url" 
type="string">http://localhost:8983/solr/tika-test</param>
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index e87f730..5737eb5 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -38,6 +38,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
 public class CSVFetchIterator extends FetchIterator implements Initializable {
 
     private Charset charset = StandardCharsets.UTF_8;
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 9908681..057fecd 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -41,6 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
 public class JDBCFetchIterator extends FetchIterator implements Initializable {
 
 
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 0fd88b6..bd85658 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -38,6 +38,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
 public class S3FetchIterator extends FetchIterator implements Initializable {
 
 
@@ -96,10 +98,10 @@ public class S3FetchIterator extends FetchIterator 
implements Initializable {
         int count = 0;
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, 
s3PathPrefix)) {
             long elapsed = System.currentTimeMillis() - start;
-            LOGGER.debug("adding ({}) {} in {} ms", count, 
bucket+"/"+summary.getKey(),
+            LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
                     elapsed);
             tryToAdd( new FetchIdMetadataPair(
-                    new FetchId(fetcherName, bucket+"/"+summary.getKey()),
+                    new FetchId(fetcherName, summary.getKey()),
                     new Metadata()));
             count++;
         }
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
index 1d561aa..55afa8d 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
@@ -85,7 +85,7 @@
                 <configuration>
                     <archive>
                         <manifestEntries>
-                            
<Automatic-Module-Name>org.apache.tika.fetcher.s3</Automatic-Module-Name>
+                            
<Automatic-Module-Name>org.apache.tika.pipes.fetcher.s3</Automatic-Module-Name>
                         </manifestEntries>
                     </archive>
                 </configuration>
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 126f8fa..25026a1 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -47,6 +47,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
 /**
  * Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
  * This will parse the bucket out of that string and retrieve the path.
@@ -56,6 +58,7 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(S3Fetcher.class);
     private static final String PREFIX = "s3";
     private String region;
+    private String bucket;
     private String profile;
     private boolean extractUserMetadata = true;
     private AmazonS3 s3Client;
@@ -65,22 +68,10 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
     public InputStream fetch(String fetchKey, Metadata metadata)
             throws TikaException, IOException {
 
-        LOGGER.debug("about to fetch fetchkey={}", fetchKey);
-        final String origFetchKey = fetchKey;
-        if (fetchKey.startsWith("//")) {
-            fetchKey = fetchKey.substring(2);
-        } else if (fetchKey.startsWith("/")) {
-            fetchKey = fetchKey.substring(1);
-        }
-        int i = fetchKey.indexOf("/");
-        if (i < 0) {
-            throw new FetcherStringException("Couldn't find bucket:" +
-                    origFetchKey);
-        }
-        String bucket = fetchKey.substring(0, i);
-        String key = fetchKey.substring(i + 1);
-        LOGGER.debug("about to fetch bucket: ({}); key: ({})", bucket, key);
-        S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket, 
key));
+        LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
+                fetchKey, bucket);
+
+        S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket, 
fetchKey));
         if (extractUserMetadata) {
             for (Map.Entry<String, String> e :
                     
fullObject.getObjectMetadata().getUserMetadata().entrySet()) {
@@ -116,6 +107,11 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
         this.profile = profile;
     }
 
+    @Field
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
     /**
      * Whether or not to extract user metadata from the S3Object
      *
@@ -138,6 +134,9 @@ public class S3Fetcher extends AbstractFetcher implements 
Initializable {
 
     @Override
     public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
+        mustNotBeEmpty("bucket", this.bucket);
+        mustNotBeEmpty("profile", this.profile);
+        mustNotBeEmpty("region", this.region);
 
     }
 }
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
index 5c1de6c..740058a 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
@@ -17,7 +17,7 @@
 -->
 <properties>
     <fetchers>
-        <fetcher class="org.apache.tika.fetcher.s3.S3Fetcher">
+        <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
             <params>
                 <param name="name" type="string">s3</param>
                 <param name="region" type="string">us-east-1</param>
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml 
b/tika-pipes/tika-pipes-integration-tests/pom.xml
index ccb138c..1e17963 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -42,18 +42,30 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-             <dependency>
-                  <groupId>${project.groupId}</groupId>
-                  <artifactId>tika-fetcher-s3</artifactId>
-                  <version>${project.version}</version>
-                  <scope>test</scope>
-              </dependency>
-              <dependency>
-                  <groupId>${project.groupId}</groupId>
-                  <artifactId>tika-fetch-iterator-s3</artifactId>
-                  <version>${project.version}</version>
-                  <scope>test</scope>
-              </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-fetcher-s3</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-fetch-iterator-s3</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-emitter-s3</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-serialization</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 590a125..05423ce 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -19,20 +19,23 @@ package org.apache.tika.pipes;
 
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.iterable.S3Objects;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.s3.S3Emitter;
 import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -94,7 +97,7 @@ public class PipeIntegrationTests {
         ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
         completionService.submit(it);
         for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new FetcherEmitter(
+            completionService.submit(new FSFetcherEmitter(
                     queue, tikaConfig.getFetcherManager().getFetcher("s3"), 
null));
         }
         int finished = 0;
@@ -106,8 +109,31 @@ public class PipeIntegrationTests {
         } finally {
             es.shutdownNow();
         }
+    }
 
-
+    @Test
+    public void testS3ToS3() throws Exception {
+        TikaConfig tikaConfig = getConfig("tika-config-s3Tos3.xml");
+        FetchIterator it = tikaConfig.getFetchIterator();
+        int numConsumers = 20;
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
+        ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
+        ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+        completionService.submit(it);
+        for (int i = 0; i < numConsumers; i++) {
+            completionService.submit(new S3FetcherEmitter(
+                    queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
+                    
(S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
+        }
+        int finished = 0;
+        try {
+            while (finished++ < numConsumers+1) {
+                Future<Integer> future = completionService.take();
+                future.get();
+            }
+        } finally {
+            es.shutdownNow();
+        }
     }
 
     private TikaConfig getConfig(String fileName) throws Exception {
@@ -118,14 +144,14 @@ public class PipeIntegrationTests {
     }
 
 
-    private static class FetcherEmitter implements Callable<Integer> {
+    private static class FSFetcherEmitter implements Callable<Integer> {
         private static final AtomicInteger counter = new AtomicInteger(0);
 
         private final Fetcher fetcher;
         private final Emitter emitter;
         private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
 
-        FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+        FSFetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
                 fetcher, Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
@@ -159,4 +185,48 @@ public class PipeIntegrationTests {
             }
         }
     }
+
+    private static class S3FetcherEmitter implements Callable<Integer> {
+        private static final AtomicInteger counter = new AtomicInteger(0);
+
+        private final Fetcher fetcher;
+        private final S3Emitter emitter;
+        private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+
+        S3FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+                fetcher, S3Emitter emitter) {
+            this.queue = queue;
+            this.fetcher = fetcher;
+            this.emitter = emitter;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+
+            while (true) {
+                FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
+                if (p == null) {
+                    throw new TimeoutException("");
+                }
+                if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+                    return 1;
+                }
+                process(p);
+            }
+        }
+
+        private void process(FetchIdMetadataPair p) throws IOException, 
TikaException {
+            Metadata userMetadata = new Metadata();
+            userMetadata.set("project", "my-project");
+
+            try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(), 
p.getMetadata())) {
+                long length = -1;
+                if (is instanceof TikaInputStream &&
+                        ((TikaInputStream) is).hasFile()) {
+                    length = ((TikaInputStream)is).getLength();
+                }
+                emitter.emit(p.getFetchId().getFetchKey(), is, length, 
userMetadata);
+            }
+        }
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
new file mode 100644
index 0000000..c16d64c
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+    <fetchers>
+        <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
+            <params>
+                <param name="name" type="string">s3f</param>
+                <param name="region" type="string">us-east-1</param>
+                <param name="bucket" type="string"><!-- fill in here 
-->></param>
+                <param name="profile" type="string"><!-- fill in here 
--></param>
+            </params>
+        </fetcher>
+    </fetchers>
+    <fetchIterators>
+        <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+            <params>
+                <param name="fetcherName" type="string">s3f</param>
+                <param name="region" type="string">us-east-1</param>
+                <param name="bucket" type="string"><!-- fill in here 
-->></param>
+                <param name="profile" type="string"><!-- fill in here 
--></param>
+            </params>
+        </fetchIterator>
+    </fetchIterators>
+    <emitters>
+        <emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter">
+            <params>
+                <param name="name" type="string">s3e</param>
+                <param name="region" type="string">us-east-1</param>
+                <param name="bucket" type="string"><!-- fill in here 
-->></param>
+                <param name="profile" type="string"><!-- fill in here 
--></param>
+                <param name="fileExtension" type="string"></param>
+            </params>
+        </emitter>
+    </emitters>
+</properties>
\ No newline at end of file
diff --git 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index bfa3716..743bed9 100644
--- 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -49,14 +49,14 @@
         </metadataFilter>
     </metadataFilters>
     <emitters>
-        <emitter class="org.apache.tika.emitter.fs.FileSystemEmitter">
+        <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
             <params>
                 <param name="name" type="string">fs</param>
                 <param name="basePath" type="string">fix</param>
             </params>
         </emitter>
         <!--
-        <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+        <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
             <params>
                 <param name="name" type="string">solr1</param>
                 <param name="url" 
type="string">http://localhost:8983/solr/tika-test</param>
@@ -65,7 +65,7 @@
                 <param name="commitWithin" type="int">10</param>
             </params>
         </emitter>
-        <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+        <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
             <params>
                 <param name="name" type="string">solr2</param>
                 <param name="url" 
type="string">http://localhost:8983/solr/tika-test</param>
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
index 4e41d2f..c3f5e31 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
@@ -95,7 +95,7 @@ public class TikaEmitterTest extends CXFTestBase {
                         "</fetcher>"+
                     "</fetchers>"+
                     "<emitters>"+
-                        "<emitter 
class=\"org.apache.tika.emitter.fs.FileSystemEmitter\">"+
+                        "<emitter 
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
                             "<params>"+
                                 "<param name=\"name\" 
type=\"string\">fse</param>"+
                                 "<param name=\"basePath\" type=\"string\">"+ 
TMP_OUTPUT_DIR.toAbsolutePath()+"</param>"+
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 3fa10f5..65e0f57 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -94,7 +94,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
                 "</fetcher>"+
                 "</fetchers>"+
                 "<emitters>"+
-                "<emitter 
class=\"org.apache.tika.emitter.fs.FileSystemEmitter\">"+
+                "<emitter 
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
                 "<params>"+
                 "<param name=\"name\" 
type=\"string\">"+EMITTER_NAME+"</param>"+
 

Reply via email to