This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 112c689 TIKA-3283 -- add s3 emitter
112c689 is described below
commit 112c689b9fff43aa881fa504abab5642b658a090
Author: tballison <[email protected]>
AuthorDate: Wed Jan 27 13:27:59 2021 -0500
TIKA-3283 -- add s3 emitter
---
.../main/java/org/apache/tika/config/Param.java | 9 +-
.../java/org/apache/tika/config/TikaConfig.java | 7 +-
.../apache/tika/pipes/emitter/AbstractEmitter.java | 1 +
.../org/apache/tika/pipes/emitter/Emitter.java | 1 -
.../tika/pipes/emitter/TikaEmitterException.java | 4 +
.../tika/pipes/fetchiterator/FetchIterator.java | 7 -
.../fetchiterator/FileSystemFetchIteratorTest.java | 4 +-
tika-pipes/tika-emitters/pom.xml | 1 +
.../{ => pipes}/emitter/fs/FileSystemEmitter.java | 42 ++--
.../tika-emitter-s3}/pom.xml | 63 ++++--
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 224 +++++++++++++++++++++
.../tika/{ => pipes}/emitter/solr/SolrEmitter.java | 2 +-
.../tika/{ => pipes}/emitter/solr/TestBasic.java | 2 +-
.../test/resources/tika-config-simple-emitter.xml | 4 +-
.../pipes/fetchiterator/csv/CSVFetchIterator.java | 2 +
.../fetchiterator/jdbc/JDBCFetchIterator.java | 2 +
.../pipes/fetchiterator/s3/S3FetchIterator.java | 6 +-
tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml | 2 +-
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 31 ++-
.../src/test/resources/tika-config-s3.xml | 2 +-
tika-pipes/tika-pipes-integration-tests/pom.xml | 36 ++--
.../apache/tika/pipes/PipeIntegrationTests.java | 80 +++++++-
.../src/test/resources/tika-config-s3Tos3.xml | 52 +++++
.../resources/tika-config-simple-fs-emitter.xml | 6 +-
.../apache/tika/server/core/TikaEmitterTest.java | 2 +-
.../core/TikaServerEmitterIntegrationTest.java | 2 +-
26 files changed, 497 insertions(+), 97 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/config/Param.java
b/tika-core/src/main/java/org/apache/tika/config/Param.java
index 74123ac..8e185f0 100644
--- a/tika-core/src/main/java/org/apache/tika/config/Param.java
+++ b/tika-core/src/main/java/org/apache/tika/config/Param.java
@@ -233,8 +233,13 @@ public class Param<T> implements Serializable {
if (List.class.isAssignableFrom(ret.type)) {
loadList(ret, node);
} else {
- ret.actualValue = getTypedValue(ret.type, value.getTextContent());
- ret.valueStrings.add(value.getTextContent());
+ //allow the empty string
+ String textContent = "";
+ if (value != null) {
+ textContent = value.getTextContent();
+ }
+ ret.actualValue = getTypedValue(ret.type, textContent);
+ ret.valueStrings.add(textContent);
}
return ret;
}
diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
index f852c57..bd4c389 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
@@ -1641,5 +1641,10 @@ public class TikaConfig {
}
}
-
+ public static void mustNotBeEmpty(String paramName, String paramValue)
throws TikaConfigException {
+ if (paramValue == null || paramValue.trim().equals("")) {
+ throw new IllegalArgumentException("parameter '"+
+ paramName + "' must be set in the config file");
+ }
+ }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 0aa9e7f..1117537 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -26,6 +26,7 @@ public abstract class AbstractEmitter implements Emitter {
public void setName(String name) {
this.name = name;
}
+
@Override
public String getName() {
return name;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 8f31311..445000d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -21,7 +21,6 @@ import org.apache.tika.metadata.Metadata;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
public interface Emitter {
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
index 00a5197..f74a6e5 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/TikaEmitterException.java
@@ -22,4 +22,8 @@ public class TikaEmitterException extends TikaException {
public TikaEmitterException(String msg) {
super(msg);
}
+
+ public TikaEmitterException(String msg, Throwable t) {
+ super(msg, t);
+ }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index b11615f..d3528ee 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -125,11 +125,4 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
//no-op
}
- protected static void mustNotBeEmpty(String paramName, String paramValue) {
- if (paramValue == null || paramValue.trim().equals("")) {
- throw new IllegalArgumentException("parameter '"+
- paramName + "' must be set in the config file");
- }
- }
-
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index f23489a..1560166 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -52,7 +52,7 @@ public class FileSystemFetchIteratorTest {
String fetcherName = "fs";
ExecutorService es = Executors.newFixedThreadPool(1);
- ExecutorCompletionService cs = new ExecutorCompletionService(es);
+ ExecutorCompletionService<Integer> cs = new
ExecutorCompletionService<>(es);
FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
it.setQueueSize(20000);
ArrayBlockingQueue<FetchIdMetadataPair> q = it.init(1);
@@ -60,7 +60,7 @@ public class FileSystemFetchIteratorTest {
cs.submit(it);
- Future f = cs.take();
+ Future<Integer> f = cs.take();
f.get();
Set<String> iteratorSet = new HashSet<>();
diff --git a/tika-pipes/tika-emitters/pom.xml b/tika-pipes/tika-emitters/pom.xml
index bfb55c8..28b23e4 100644
--- a/tika-pipes/tika-emitters/pom.xml
+++ b/tika-pipes/tika-emitters/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>tika-emitter-fs</module>
+ <module>tika-emitter-s3</module>
<module>tika-emitter-solr</module>
</modules>
diff --git
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
similarity index 74%
rename from
tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
rename to
tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index e42b57c..7ebe5bb 100644
---
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/emitter/fs/FileSystemEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.emitter.fs;
+package org.apache.tika.pipes.emitter.fs;
import org.apache.tika.config.Field;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.exception.TikaException;
@@ -34,16 +35,33 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-public class FileSystemEmitter implements Emitter {
+/**
+ * Emitter to write to a file system.
+ *
+ * This calculates the path to write to based on the {@link #basePath}
+ * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value.
+ *
+ * <pre class="prettyprint">
+ * <properties>
+ * <emitters>
+ * <emitter
class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter>
+ * <params>
+ * <!-- required -->
+ * <param name="name" type="string">fs</param>
+ * <!-- required -->
+ * <param name="basePath"
type="string">/path/to/output</param>
+ * <!-- optional; default is 'json' -->
+ * <param name="fileExtension"
type="string">json</param>
+ * </params>
+ * </emitter>
+ * </emitters>
+ * </properties></pre>
+ */
+public class FileSystemEmitter extends AbstractEmitter {
- private String name = "fs";
private Path basePath = null;
private String fileExtension = "json";
- @Override
- public String getName() {
- return name;
- }
@Override
public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
@@ -89,14 +107,4 @@ public class FileSystemEmitter implements Emitter {
public void setFileExtension(String fileExtension) {
this.fileExtension = fileExtension;
}
-
- /**
- * Set this so to uniquely identify this emitter if
- * there might be others available. The default is "fs"
- * @param name
- */
- @Field
- public void setName(String name) {
- this.name = name;
- }
}
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml
b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
similarity index 51%
copy from tika-pipes/tika-pipes-integration-tests/pom.xml
copy to tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index ccb138c..61f8c77 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -21,44 +21,65 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>tika-pipes</artifactId>
+ <artifactId>tika-emitters</artifactId>
<groupId>org.apache.tika</groupId>
<version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-pipes-integration-tests</artifactId>
+ <artifactId>tika-emitter-s3</artifactId>
<dependencies>
+ <!-- should serialization be provided or bundled? -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-core</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>${commons.logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-fetcher-s3</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-fetch-iterator-s3</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
-
-</project>
+</project>
\ No newline at end of file
diff --git
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
new file mode 100644
index 0000000..bec5e1a
--- /dev/null
+++
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.s3;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.metadata.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+/**
+ * Emits to existing s3 bucket
+ * <pre class="prettyprint">
+ * <properties>
+ * <emitters>
+ * <emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter>
+ * <params>
+ * <!-- required -->
+ * <param name="name" type="string">s3e</param>
+ * <!-- required -->
+ * <param name="region"
type="string">us-east-1</param>
+ * <!-- required -->
+ * <param name="profile"
type="string">my-profile</param>
+ * <!-- required -->
+ * <param name="bucket"
type="string">my-bucket</param>
+ * <!-- optional; default is 'json' -->
+ * <param name="fileExtension"
type="string">json</param>
+ * <!-- optional; default is 'true'-- whether to copy the
json to a local file before putting to s3 -->
+ * <param name="spoolToTemp"
type="bool">true</param>
+ * </params>
+ * </emitter>
+ * </emitters>
+ * </properties></pre>
+ */
+public class S3Emitter extends AbstractEmitter implements Initializable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(S3Emitter.class);
+ private String region;
+ private String profile;
+ private String bucket;
+ private AmazonS3 s3Client;
+ private String fileExtension = "json";
+ private boolean spoolToTemp = true;
+
+
+ /**
+ * Requires the src-bucket/path/to/my/file.txt in the {@link
TikaCoreProperties#SOURCE_PATH}.
+ *
+ * @param metadataList
+ * @throws IOException
+ * @throws TikaException
+ */
+ @Override
+ public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ if (metadataList == null || metadataList.size() == 0) {
+ throw new TikaEmitterException("metadata list must not be null or
of size 0");
+ }
+ String path = metadataList.get(0)
+ .get(TikaCoreProperties.SOURCE_PATH);
+ if (path == null) {
+ throw new TikaEmitterException("Must specify a
"+TikaCoreProperties.SOURCE_PATH.getName() +
+ " in the metadata in order for this emitter to generate
the output file path.");
+ }
+ if (! spoolToTemp) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (Writer writer =
+ new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8))) {
+ JsonMetadataList.toJson(metadataList, writer);
+ }
+ byte[] bytes = bos.toByteArray();
+ long length = bytes.length;
+ try (InputStream is = new ByteArrayInputStream(bytes)) {
+ emit(path, is, length, new Metadata());
+ }
+ } else {
+ TemporaryResources tmp = new TemporaryResources();
+ try {
+ Path tmpPath = tmp.createTempFile();
+ try (Writer writer = Files.newBufferedWriter(tmpPath,
+ StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
+ JsonMetadataList.toJson(metadataList, writer);
+ }
+ long length = Files.size(tmpPath);
+ try (InputStream is = Files.newInputStream(tmpPath)) {
+ emit(path, is, length, new Metadata());
+ }
+ } finally {
+ tmp.close();
+ }
+ }
+ }
+
+ /**
+ *
+ * @param path -- object path, not including the bucket
+ * @param is inputStream to copy
+ * @param userMetadata this will be written to the s3 ObjectMetadata's
userMetadata
+ * @throws TikaEmitterException
+ */
+ public void emit(String path, InputStream is, long length, Metadata
userMetadata) throws TikaEmitterException {
+
+ if (fileExtension != null && fileExtension.length() > 0) {
+ path += "." + fileExtension;
+ }
+
+ LOGGER.debug("about to emit to target bucket: ({}) path:({})",
+ bucket, path);
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (length > 0) {
+ objectMetadata.setContentLength(length);
+ }
+ for (String n : userMetadata.names()) {
+ String[] vals = userMetadata.getValues(n);
+ if (vals.length > 1) {
+ LOGGER.warn("Can only write the first value for key {}. I see
{} values.",
+ n, vals.length);
+ }
+ objectMetadata.addUserMetadata(n, vals[0]);
+ }
+ try {
+ s3Client.putObject(bucket, path, is, objectMetadata);
+ } catch (SdkClientException e) {
+ throw new TikaEmitterException("problem writing s3object", e);
+ }
+ }
+
+ /**
+ * Whether or not to spool the metadatalist to a tmp file before putting
object.
+ * Default: <code>true</code>. If this is set to <code>false</code>,
+ * this emitter writes the json object to memory and then puts that into
s3.
+ * @param spoolToTemp
+ */
+ @Field
+ public void setSpoolToTemp(boolean spoolToTemp) {
+ this.spoolToTemp = spoolToTemp;
+ }
+
+ @Field
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ @Field
+ public void setProfile(String profile) {
+ this.profile = profile;
+ }
+
+ @Field
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ /**
+ * If you want to customize the output file's file extension.
+ * Do not include the "."
+ * @param fileExtension
+ */
+ @Field
+ public void setFileExtension(String fileExtension) {
+ this.fileExtension = fileExtension;
+ }
+
+ @Override
+ public void initialize(Map<String, Param> params) throws
TikaConfigException {
+ //params have already been set
+ //ignore them
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withRegion(region)
+ .withCredentials(new ProfileCredentialsProvider(profile))
+ .build();
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
+ mustNotBeEmpty("bucket", this.bucket);
+ mustNotBeEmpty("profile", this.profile);
+ mustNotBeEmpty("region", this.region);
+ }
+
+}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
similarity index 99%
rename from
tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
rename to
tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index df78bb7..14d0a88 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.emitter.solr;
+package org.apache.tika.pipes.emitter.solr;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
similarity index 98%
rename from
tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
rename to
tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 62b8c87..5f1e75d 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/emitter/solr/TestBasic.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.emitter.solr;
+package org.apache.tika.pipes.emitter.solr;
import org.apache.tika.config.TikaConfig;
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
index 18c65c6..644f0a4 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/tika-config-simple-emitter.xml
@@ -32,7 +32,7 @@
</metadataFilter>
</metadataFilters>
<emitters>
- <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
<params>
<param name="name" type="string">solr1</param>
<param name="url"
type="string">http://localhost:8983/solr/tika-test</param>
@@ -41,7 +41,7 @@
<param name="commitWithin" type="int">10</param>
</params>
</emitter>
- <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
<params>
<param name="name" type="string">solr2</param>
<param name="url"
type="string">http://localhost:8983/solr/tika-test</param>
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index e87f730..5737eb5 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -38,6 +38,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
public class CSVFetchIterator extends FetchIterator implements Initializable {
private Charset charset = StandardCharsets.UTF_8;
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 9908681..057fecd 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
public class JDBCFetchIterator extends FetchIterator implements Initializable {
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 0fd88b6..bd85658 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -38,6 +38,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
public class S3FetchIterator extends FetchIterator implements Initializable {
@@ -96,10 +98,10 @@ public class S3FetchIterator extends FetchIterator
implements Initializable {
int count = 0;
for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket,
s3PathPrefix)) {
long elapsed = System.currentTimeMillis() - start;
- LOGGER.debug("adding ({}) {} in {} ms", count,
bucket+"/"+summary.getKey(),
+ LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
elapsed);
tryToAdd( new FetchIdMetadataPair(
- new FetchId(fetcherName, bucket+"/"+summary.getKey()),
+ new FetchId(fetcherName, summary.getKey()),
new Metadata()));
count++;
}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
index 1d561aa..55afa8d 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/pom.xml
@@ -85,7 +85,7 @@
<configuration>
<archive>
<manifestEntries>
-
<Automatic-Module-Name>org.apache.tika.fetcher.s3</Automatic-Module-Name>
+
<Automatic-Module-Name>org.apache.tika.pipes.fetcher.s3</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 126f8fa..25026a1 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -47,6 +47,8 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
/**
* Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
* This will parse the bucket out of that string and retrieve the path.
@@ -56,6 +58,7 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(S3Fetcher.class);
private static final String PREFIX = "s3";
private String region;
+ private String bucket;
private String profile;
private boolean extractUserMetadata = true;
private AmazonS3 s3Client;
@@ -65,22 +68,10 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
public InputStream fetch(String fetchKey, Metadata metadata)
throws TikaException, IOException {
- LOGGER.debug("about to fetch fetchkey={}", fetchKey);
- final String origFetchKey = fetchKey;
- if (fetchKey.startsWith("//")) {
- fetchKey = fetchKey.substring(2);
- } else if (fetchKey.startsWith("/")) {
- fetchKey = fetchKey.substring(1);
- }
- int i = fetchKey.indexOf("/");
- if (i < 0) {
- throw new FetcherStringException("Couldn't find bucket:" +
- origFetchKey);
- }
- String bucket = fetchKey.substring(0, i);
- String key = fetchKey.substring(i + 1);
- LOGGER.debug("about to fetch bucket: ({}); key: ({})", bucket, key);
- S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket,
key));
+ LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
+ fetchKey, bucket);
+
+ S3Object fullObject = s3Client.getObject(new GetObjectRequest(bucket,
fetchKey));
if (extractUserMetadata) {
for (Map.Entry<String, String> e :
fullObject.getObjectMetadata().getUserMetadata().entrySet()) {
@@ -116,6 +107,11 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
this.profile = profile;
}
+ @Field
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
/**
* Whether or not to extract user metadata from the S3Object
*
@@ -138,6 +134,9 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable {
@Override
public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
+ mustNotBeEmpty("bucket", this.bucket);
+ mustNotBeEmpty("profile", this.profile);
+ mustNotBeEmpty("region", this.region);
}
}
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
index 5c1de6c..740058a 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
@@ -17,7 +17,7 @@
-->
<properties>
<fetchers>
- <fetcher class="org.apache.tika.fetcher.s3.S3Fetcher">
+ <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
<params>
<param name="name" type="string">s3</param>
<param name="region" type="string">us-east-1</param>
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml
b/tika-pipes/tika-pipes-integration-tests/pom.xml
index ccb138c..1e17963 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -42,18 +42,30 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-fetcher-s3</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-fetch-iterator-s3</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-fetcher-s3</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-fetch-iterator-s3</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-emitter-s3</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 590a125..05423ce 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -19,20 +19,23 @@ package org.apache.tika.pipes;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.iterable.S3Objects;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.s3.S3Emitter;
import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -94,7 +97,7 @@ public class PipeIntegrationTests {
ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
completionService.submit(it);
for (int i = 0; i < numConsumers; i++) {
- completionService.submit(new FetcherEmitter(
+ completionService.submit(new FSFetcherEmitter(
queue, tikaConfig.getFetcherManager().getFetcher("s3"),
null));
}
int finished = 0;
@@ -106,8 +109,31 @@ public class PipeIntegrationTests {
} finally {
es.shutdownNow();
}
+ }
-
+ @Test
+ public void testS3ToS3() throws Exception {
+ TikaConfig tikaConfig = getConfig("tika-config-s3Tos3.xml");
+ FetchIterator it = tikaConfig.getFetchIterator();
+ int numConsumers = 20;
+ ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
+ ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
+ ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+ completionService.submit(it);
+ for (int i = 0; i < numConsumers; i++) {
+ completionService.submit(new S3FetcherEmitter(
+ queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
+
(S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
+ }
+ int finished = 0;
+ try {
+ while (finished++ < numConsumers+1) {
+ Future<Integer> future = completionService.take();
+ future.get();
+ }
+ } finally {
+ es.shutdownNow();
+ }
}
private TikaConfig getConfig(String fileName) throws Exception {
@@ -118,14 +144,14 @@ public class PipeIntegrationTests {
}
- private static class FetcherEmitter implements Callable<Integer> {
+ private static class FSFetcherEmitter implements Callable<Integer> {
private static final AtomicInteger counter = new AtomicInteger(0);
private final Fetcher fetcher;
private final Emitter emitter;
private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
- FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+ FSFetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
fetcher, Emitter emitter) {
this.queue = queue;
this.fetcher = fetcher;
@@ -159,4 +185,48 @@ public class PipeIntegrationTests {
}
}
}
+
+ private static class S3FetcherEmitter implements Callable<Integer> {
+ private static final AtomicInteger counter = new AtomicInteger(0);
+
+ private final Fetcher fetcher;
+ private final S3Emitter emitter;
+ private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+
+ S3FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+ fetcher, S3Emitter emitter) {
+ this.queue = queue;
+ this.fetcher = fetcher;
+ this.emitter = emitter;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+
+ while (true) {
+ FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
+ if (p == null) {
+ throw new TimeoutException("");
+ }
+ if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ return 1;
+ }
+ process(p);
+ }
+ }
+
+ private void process(FetchIdMetadataPair p) throws IOException,
TikaException {
+ Metadata userMetadata = new Metadata();
+ userMetadata.set("project", "my-project");
+
+ try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(),
p.getMetadata())) {
+ long length = -1;
+ if (is instanceof TikaInputStream &&
+ ((TikaInputStream) is).hasFile()) {
+ length = ((TikaInputStream)is).getLength();
+ }
+ emitter.emit(p.getFetchId().getFetchKey(), is, length,
userMetadata);
+ }
+ }
+ }
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
new file mode 100644
index 0000000..c16d64c
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<properties>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
+ <params>
+ <param name="name" type="string">s3f</param>
+ <param name="region" type="string">us-east-1</param>
+ <param name="bucket" type="string"><!-- fill in here
-->></param>
+ <param name="profile" type="string"><!-- fill in here
--></param>
+ </params>
+ </fetcher>
+ </fetchers>
+ <fetchIterators>
+ <fetchIterator
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+ <params>
+ <param name="fetcherName" type="string">s3f</param>
+ <param name="region" type="string">us-east-1</param>
+ <param name="bucket" type="string"><!-- fill in here
-->></param>
+ <param name="profile" type="string"><!-- fill in here
--></param>
+ </params>
+ </fetchIterator>
+ </fetchIterators>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter">
+ <params>
+ <param name="name" type="string">s3e</param>
+ <param name="region" type="string">us-east-1</param>
+ <param name="bucket" type="string"><!-- fill in here
-->></param>
+ <param name="profile" type="string"><!-- fill in here
--></param>
+ <param name="fileExtension" type="string"></param>
+ </params>
+ </emitter>
+ </emitters>
+</properties>
\ No newline at end of file
diff --git
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index bfa3716..743bed9 100644
---
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -49,14 +49,14 @@
</metadataFilter>
</metadataFilters>
<emitters>
- <emitter class="org.apache.tika.emitter.fs.FileSystemEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
<params>
<param name="name" type="string">fs</param>
<param name="basePath" type="string">fix</param>
</params>
</emitter>
<!--
- <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
<params>
<param name="name" type="string">solr1</param>
<param name="url"
type="string">http://localhost:8983/solr/tika-test</param>
@@ -65,7 +65,7 @@
<param name="commitWithin" type="int">10</param>
</params>
</emitter>
- <emitter class="org.apache.tika.emitter.solr.SolrEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
<params>
<param name="name" type="string">solr2</param>
<param name="url"
type="string">http://localhost:8983/solr/tika-test</param>
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
index 4e41d2f..c3f5e31 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
@@ -95,7 +95,7 @@ public class TikaEmitterTest extends CXFTestBase {
"</fetcher>"+
"</fetchers>"+
"<emitters>"+
- "<emitter
class=\"org.apache.tika.emitter.fs.FileSystemEmitter\">"+
+ "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
"<params>"+
"<param name=\"name\"
type=\"string\">fse</param>"+
"<param name=\"basePath\" type=\"string\">"+
TMP_OUTPUT_DIR.toAbsolutePath()+"</param>"+
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 3fa10f5..65e0f57 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -94,7 +94,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
"</fetcher>"+
"</fetchers>"+
"<emitters>"+
- "<emitter
class=\"org.apache.tika.emitter.fs.FileSystemEmitter\">"+
+ "<emitter
class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
"<params>"+
"<param name=\"name\"
type=\"string\">"+EMITTER_NAME+"</param>"+