This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 3712dce TIKA-3226 - refactor to allow users to specify emit key, add
stream emitter
3712dce is described below
commit 3712dce23339737c0c9dc2b8591f43c8123f231f
Author: tballison <[email protected]>
AuthorDate: Thu Jan 28 11:47:42 2021 -0500
TIKA-3226 - refactor to allow users to specify emit key, add stream emitter
---
.../java/org/apache/tika/config/TikaConfig.java | 7 +
.../emitter/{EmptyEmitter.java => EmitKey.java} | 29 ++--
.../org/apache/tika/pipes/emitter/Emitter.java | 3 +-
.../apache/tika/pipes/emitter/EmptyEmitter.java | 2 +-
.../apache/tika/pipes/emitter/StreamEmitter.java} | 12 +-
.../pipes/fetcher/{FetchId.java => FetchKey.java} | 12 +-
.../FetchEmitTuple.java} | 49 +++----
.../tika/pipes/fetchiterator/FetchIterator.java | 21 ++-
.../fetchiterator/FileSystemFetchIterator.java | 32 ++---
.../java/org/apache/tika/utils/StringUtils.java | 4 +
.../org/apache/tika/pipes/emitter/MockEmitter.java | 2 +-
.../fetchiterator/FileSystemFetchIteratorTest.java | 7 +-
.../tika/pipes/emitter/fs/FileSystemEmitter.java | 30 ++--
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 66 ++++++---
.../tika/pipes/emitter/solr/SolrEmitter.java | 33 ++---
.../apache/tika/pipes/emitter/solr/TestBasic.java | 3 +-
.../pipes/fetchiterator/csv/CSVFetchIterator.java | 153 ++++++++++++++++++---
.../src/test/java/TestCSVFetchIterator.java | 30 ++--
.../fetchiterator/jdbc/JDBCFetchIterator.java | 111 +++++++++++++--
.../fetchiterator/jdbc/TestJDBCFetchIterator.java | 31 ++---
.../pipes/fetchiterator/s3/S3FetchIterator.java | 12 +-
.../fetchiterator/s3/TestS3FetchIterator.java | 22 ++-
.../apache/tika/pipes/PipeIntegrationTests.java | 51 +++----
.../org/apache/tika/server/client/TikaClient.java | 15 +-
.../apache/tika/server/client/TikaClientCLI.java | 37 ++---
.../tika/server/client/TikaClientConfig.java | 85 ++++++++++++
.../tika/server/core/resource/EmitterResource.java | 68 ++++-----
.../apache/tika/server/core/TikaEmitterTest.java | 6 +-
.../core/TikaServerEmitterIntegrationTest.java | 2 +-
29 files changed, 620 insertions(+), 315 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
index bd4c389..e29a337 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
@@ -1647,4 +1647,11 @@ public class TikaConfig {
paramName + "' must be set in the config file");
}
}
+
+ public static void mustNotBeEmpty(String paramName, Path paramValue)
throws TikaConfigException {
+ if (paramValue == null) {
+ throw new IllegalArgumentException("parameter '"+
+ paramName + "' must be set in the config file");
+ }
+ }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
similarity index 61%
copy from
tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
copy to tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index 9a1a10a..47a8ee7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -16,20 +16,29 @@
*/
package org.apache.tika.pipes.emitter;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
+public class EmitKey {
-import java.io.IOException;
-import java.util.List;
+ private final String emitterName;
+ private final String emitKey;
-public class EmptyEmitter implements Emitter {
- @Override
- public String getName() {
- return "empty";
+ public EmitKey(String emitterName, String emitKey) {
+ this.emitterName = emitterName;
+ this.emitKey = emitKey;
}
- @Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ public String getEmitterName() {
+ return emitterName;
+ }
+ public String getEmitKey() {
+ return emitKey;
+ }
+
+ @Override
+ public String toString() {
+ return "EmitterKey{" +
+ "emitterName='" + emitterName + '\'' +
+ ", emitterKey='" + emitKey + '\'' +
+ '}';
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 445000d..4dc2291 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -16,7 +16,6 @@
*/
package org.apache.tika.pipes.emitter;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import java.io.IOException;
@@ -26,7 +25,7 @@ public interface Emitter {
String getName();
- void emit(List<Metadata> metadataList) throws IOException, TikaException;
+ void emit(String emitKey, List<Metadata> metadataList) throws IOException,
TikaEmitterException;
//TODO we can add this later?
//void emit(String txt, Metadata metadata) throws IOException,
TikaException;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 9a1a10a..8c0ebda 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -29,7 +29,7 @@ public class EmptyEmitter implements Emitter {
}
@Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
similarity index 79%
copy from tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
copy to tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
index f00d815..d7f5921 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
@@ -16,16 +16,12 @@
*/
package org.apache.tika.pipes.emitter;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import java.io.IOException;
-import java.util.List;
+import java.io.InputStream;
-public class MockEmitter extends AbstractEmitter {
-
- @Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
-
- }
+public interface StreamEmitter extends Emitter {
+ void emit(String emitKey, InputStream inputStream, Metadata userMetadata)
+ throws IOException, TikaEmitterException;
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
similarity index 82%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java
rename to tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index a327fc6..6e86bcf 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchId.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -20,11 +20,11 @@ package org.apache.tika.pipes.fetcher;
* Pair of fetcherName (which fetcher to call) and the key
* to send to that fetcher to retrieve a specific file.
*/
-public class FetchId {
+public class FetchKey {
private final String fetcherName;
private final String fetchKey;
- public FetchId(String fetcherName, String fetchKey) {
+ public FetchKey(String fetcherName, String fetchKey) {
this.fetcherName = fetcherName;
this.fetchKey = fetchKey;
}
@@ -33,7 +33,7 @@ public class FetchId {
return fetcherName;
}
- public String getFetchKey() {
+ public String getKey() {
return fetchKey;
}
@@ -50,10 +50,10 @@ public class FetchId {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- FetchId fetchId = (FetchId) o;
+ FetchKey fetchKey = (FetchKey) o;
- if (fetcherName != null ? !fetcherName.equals(fetchId.fetcherName) :
fetchId.fetcherName != null) return false;
- return fetchKey != null ? fetchKey.equals(fetchId.fetchKey) :
fetchId.fetchKey == null;
+ if (fetcherName != null ? !fetcherName.equals(fetchKey.fetcherName) :
fetchKey.fetcherName != null) return false;
+ return this.fetchKey != null ? this.fetchKey.equals(fetchKey.fetchKey)
: fetchKey.fetchKey == null;
}
@Override
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
similarity index 53%
rename from
tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
rename to
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
index 25d89db..9023378 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchIdMetadataPair.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
@@ -14,51 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetcher;
+package org.apache.tika.pipes.fetchiterator;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
-public class FetchIdMetadataPair {
+public class FetchEmitTuple {
- private final FetchId fetchId;
+ private final FetchKey fetchKey;
+ private final EmitKey emitKey;
private final Metadata metadata;
- public FetchIdMetadataPair(FetchId fetchId, Metadata metadata) {
- this.fetchId = fetchId;
+ public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata
metadata) {
+ this.fetchKey = fetchKey;
+ this.emitKey = emitKey;
this.metadata = metadata;
}
- public Metadata getMetadata() {
- return metadata;
+ public FetchKey getFetchKey() {
+ return fetchKey;
}
- public FetchId getFetchId() {
- return fetchId;
+ public EmitKey getEmitKey() {
+ return emitKey;
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
}
@Override
public String toString() {
- return "FetchIdMetadataPair{" +
- "fetchId=" + fetchId +
+ return "FetchEmitTuple{" +
+ "fetchKey=" + fetchKey +
+ ", emitKey=" + emitKey +
", metadata=" + metadata +
'}';
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- FetchIdMetadataPair that = (FetchIdMetadataPair) o;
-
- if (fetchId != null ? !fetchId.equals(that.fetchId) : that.fetchId !=
null) return false;
- return metadata != null ? metadata.equals(that.metadata) :
that.metadata == null;
- }
-
- @Override
- public int hashCode() {
- int result = fetchId != null ? fetchId.hashCode() : 0;
- result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
- return result;
- }
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index d3528ee..6b9457e 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -21,7 +21,6 @@ import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
import java.io.IOException;
import java.util.Map;
@@ -40,14 +39,15 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
public static final long DEFAULT_MAX_WAIT_MS = 300_000;
public static final int DEFAULT_QUEUE_SIZE = 1000;
- public static final FetchIdMetadataPair COMPLETED_SEMAPHORE =
- new FetchIdMetadataPair(null, null);
+ public static final FetchEmitTuple COMPLETED_SEMAPHORE =
+ new FetchEmitTuple(null, null, null);
private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
private int numConsumers = -1;
- private ArrayBlockingQueue<FetchIdMetadataPair> queue = null;
+ private ArrayBlockingQueue<FetchEmitTuple> queue = null;
private int queueSize = DEFAULT_QUEUE_SIZE;
private String fetcherName;
+ private String emitterName;
private int added = 0;
public FetchIterator() {
@@ -61,7 +61,7 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
* This must be called before 'calling' this object.
* @param numConsumers
*/
- public ArrayBlockingQueue<FetchIdMetadataPair> init(int numConsumers) {
+ public ArrayBlockingQueue<FetchEmitTuple> init(int numConsumers) {
this.queue = new ArrayBlockingQueue<>(queueSize);
this.numConsumers = numConsumers;
return queue;
@@ -76,6 +76,14 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
return fetcherName;
}
+ @Field
+ public void setEmitterName(String emitterName) {
+ this.emitterName = emitterName;
+ }
+
+ public String getEmitterName() {
+ return emitterName;
+ }
@Field
public void setMaxWaitMs(long maxWaitMs) {
@@ -86,6 +94,7 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
+
@Override
public Integer call() throws Exception {
if (queue == null || numConsumers < 0) {
@@ -105,7 +114,7 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
protected abstract void enqueue() throws IOException, TimeoutException,
InterruptedException;
- protected void tryToAdd(FetchIdMetadataPair p) throws
InterruptedException, TimeoutException {
+ protected void tryToAdd(FetchEmitTuple p) throws InterruptedException,
TimeoutException {
if (p != COMPLETED_SEMAPHORE) {
added++;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 35e2ea8..68a96a1 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -19,11 +19,11 @@ package org.apache.tika.pipes.fetchiterator;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
+import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@@ -32,7 +32,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Map;
import java.util.concurrent.TimeoutException;
public class FileSystemFetchIterator
@@ -60,9 +59,9 @@ public class FileSystemFetchIterator
throw new IllegalArgumentException("\"basePath\" directory does
not exist: " +
basePath.toAbsolutePath());
}
- String fetcherName = getFetcherName();
+
try {
- Files.walkFileTree(basePath, new FSFileVisitor(fetcherName));
+ Files.walkFileTree(basePath, new FSFileVisitor(getFetcherName(),
getEmitterName()));
} catch (IOException e) {
Throwable cause = e.getCause();
if (cause != null && cause instanceof TimeoutException) {
@@ -75,22 +74,21 @@ public class FileSystemFetchIterator
@Override
public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
+ //these should all be fatal
+ TikaConfig.mustNotBeEmpty("basePath", basePath);
+ TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName());
+ TikaConfig.mustNotBeEmpty("emitterName", getFetcherName());
- //ignore problem handler. These should all be fatal.
- if (basePath == null) {
- throw new TikaConfigException("Must specify a \"basePath\"");
- }
- if (getFetcherName() == null || getFetcherName().trim().length() == 0)
{
- throw new TikaConfigException("\"fetcherName\" must be specified
and must be not blank");
- }
}
private class FSFileVisitor implements FileVisitor<Path> {
private final String fetcherName;
- private FSFileVisitor(String fetcherName) {
+ private final String emitterName;
+ private FSFileVisitor(String fetcherName, String emitterName) {
this.fetcherName = fetcherName;
+ this.emitterName = emitterName;
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes
attrs) throws IOException {
@@ -102,8 +100,10 @@ public class FileSystemFetchIterator
String relPath = basePath.relativize(file).toString();
try {
- tryToAdd(new FetchIdMetadataPair(
- new FetchId(fetcherName, relPath), new Metadata()));
+ tryToAdd(new FetchEmitTuple(
+ new FetchKey(fetcherName, relPath),
+ new EmitKey(emitterName, relPath), new Metadata())
+ );
} catch (TimeoutException e) {
throw new IOException(e);
} catch (InterruptedException e) {
diff --git a/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
b/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
index 0b39b36..d59827c 100644
--- a/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
+++ b/tika-core/src/main/java/org/apache/tika/utils/StringUtils.java
@@ -37,6 +37,10 @@ public class StringUtils {
return cs == null || cs.length() == 0;
}
+ public static boolean isBlank(final String s) {
+ return s == null || s.trim().length() == 0;
+ }
+
/**
* <p>Left pad a String with a specified String.</p>
*
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
index f00d815..b25dfca 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
@@ -25,7 +25,7 @@ import java.util.List;
public class MockEmitter extends AbstractEmitter {
@Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 1560166..c5b5558 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.tika.pipes.fetchiterator;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
import org.junit.Test;
import java.io.IOException;
@@ -55,7 +54,7 @@ public class FileSystemFetchIteratorTest {
ExecutorCompletionService<Integer> cs = new
ExecutorCompletionService<>(es);
FetchIterator it = new FileSystemFetchIterator(fetcherName, root);
it.setQueueSize(20000);
- ArrayBlockingQueue<FetchIdMetadataPair> q = it.init(1);
+ ArrayBlockingQueue<FetchEmitTuple> q = it.init(1);
cs.submit(it);
@@ -64,11 +63,11 @@ public class FileSystemFetchIteratorTest {
f.get();
Set<String> iteratorSet = new HashSet<>();
- for (FetchIdMetadataPair p : q) {
+ for (FetchEmitTuple p : q) {
if (p == FetchIterator.COMPLETED_SEMAPHORE) {
break;
}
- iteratorSet.add(p.getFetchId().getFetchKey());
+ iteratorSet.add(p.getFetchKey().getKey());
}
assertEquals(truthSet, iteratorSet);
diff --git
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index 7ebe5bb..9398096 100644
---
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.emitter.fs;
import org.apache.tika.config.Field;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
@@ -26,6 +27,7 @@ import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -57,39 +59,37 @@ import java.util.Set;
* </emitters>
* </properties></pre>
*/
-public class FileSystemEmitter extends AbstractEmitter {
+public class FileSystemEmitter extends AbstractEmitter implements
StreamEmitter {
private Path basePath = null;
private String fileExtension = "json";
@Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
Path output;
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
}
- String relPath = metadataList.get(0)
- .get(TikaCoreProperties.SOURCE_PATH);
- if (relPath == null) {
- throw new TikaEmitterException("Must specify a
"+TikaCoreProperties.SOURCE_PATH.getName() +
- " in the metadata in order for this emitter to generate
the output file path.");
- }
if (fileExtension != null && fileExtension.length() > 0) {
- relPath += "." + fileExtension;
+ emitKey += "." + fileExtension;
}
if (basePath != null) {
- output = basePath.resolve(relPath);
+ output = basePath.resolve(emitKey);
} else {
- output = Paths.get(relPath);
+ output = Paths.get(emitKey);
}
if (!Files.isDirectory(output.getParent())) {
Files.createDirectories(output.getParent());
}
try (Writer writer = Files.newBufferedWriter(output,
StandardCharsets.UTF_8)) {
- JsonMetadataList.toJson(metadataList, writer);
+ try {
+ JsonMetadataList.toJson(metadataList, writer);
+ } catch (TikaException e) {
+ throw new TikaEmitterException("can't create json", e);
+ }
}
}
@@ -107,4 +107,10 @@ public class FileSystemEmitter extends AbstractEmitter {
public void setFileExtension(String fileExtension) {
this.fileExtension = fileExtension;
}
+
+ @Override
+ public void emit(String path, InputStream inputStream, Metadata
userMetadata) throws IOException,
+ TikaEmitterException {
+ Files.copy(inputStream, basePath.resolve(path));
+ }
}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index bec5e1a..d72d8fb 100644
---
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -28,11 +28,14 @@ import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +70,10 @@ import static
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* <param name="profile"
type="string">my-profile</param>
* <!-- required -->
* <param name="bucket"
type="string">my-bucket</param>
- * <!-- optional; default is 'json' -->
+ * <!-- optional; prefix to add to the path before
emitting; default is no prefix -->
+ * <param name="prefix"
type="string">my-prefix</param>
+ * <!-- optional; default is 'json' this will be added to
the SOURCE_PATH
+ * if no emitter key is specified -->
* <param name="fileExtension"
type="string">json</param>
* <!-- optional; default is 'true'-- whether to copy the
json to a local file before putting to s3 -->
* <param name="spoolToTemp"
type="bool">true</param>
@@ -76,16 +82,16 @@ import static
org.apache.tika.config.TikaConfig.mustNotBeEmpty;
* </emitters>
* </properties></pre>
*/
-public class S3Emitter extends AbstractEmitter implements Initializable {
+public class S3Emitter extends AbstractEmitter implements Initializable,
StreamEmitter {
private static final Logger LOGGER =
LoggerFactory.getLogger(S3Emitter.class);
private String region;
private String profile;
private String bucket;
- private AmazonS3 s3Client;
private String fileExtension = "json";
private boolean spoolToTemp = true;
-
+ private String prefix = null;
+ private AmazonS3 s3Client;
/**
* Requires the src-bucket/path/to/my/file.txt in the {@link
TikaCoreProperties#SOURCE_PATH}.
@@ -95,26 +101,22 @@ public class S3Emitter extends AbstractEmitter implements
Initializable {
* @throws TikaException
*/
@Override
- public void emit(List<Metadata> metadataList) throws IOException,
TikaException {
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
}
- String path = metadataList.get(0)
- .get(TikaCoreProperties.SOURCE_PATH);
- if (path == null) {
- throw new TikaEmitterException("Must specify a
"+TikaCoreProperties.SOURCE_PATH.getName() +
- " in the metadata in order for this emitter to generate
the output file path.");
- }
+
if (! spoolToTemp) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (Writer writer =
new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8))) {
JsonMetadataList.toJson(metadataList, writer);
+ } catch (TikaException e) {
+ throw new TikaEmitterException("can't jsonify", e);
}
byte[] bytes = bos.toByteArray();
- long length = bytes.length;
- try (InputStream is = new ByteArrayInputStream(bytes)) {
- emit(path, is, length, new Metadata());
+ try (InputStream is = TikaInputStream.get(bytes)) {
+ emit(emitKey, is, new Metadata());
}
} else {
TemporaryResources tmp = new TemporaryResources();
@@ -123,10 +125,11 @@ public class S3Emitter extends AbstractEmitter implements
Initializable {
try (Writer writer = Files.newBufferedWriter(tmpPath,
StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
JsonMetadataList.toJson(metadataList, writer);
+ } catch (TikaException e) {
+ throw new TikaEmitterException("can't jsonify", e);
}
- long length = Files.size(tmpPath);
- try (InputStream is = Files.newInputStream(tmpPath)) {
- emit(path, is, length, new Metadata());
+ try (InputStream is = TikaInputStream.get(tmpPath)) {
+ emit(emitKey, is, new Metadata());
}
} finally {
tmp.close();
@@ -141,14 +144,29 @@ public class S3Emitter extends AbstractEmitter implements
Initializable {
* @param userMetadata this will be written to the s3 ObjectMetadata's
userMetadata
* @throws TikaEmitterException
*/
- public void emit(String path, InputStream is, long length, Metadata
userMetadata) throws TikaEmitterException {
+ @Override
+ public void emit(String path, InputStream is, Metadata userMetadata)
throws IOException, TikaEmitterException {
+
+ if (!StringUtils.isBlank(prefix)) {
+ path = prefix + "/" + path;
+ }
- if (fileExtension != null && fileExtension.length() > 0) {
+ if (! StringUtils.isBlank(fileExtension)) {
path += "." + fileExtension;
}
LOGGER.debug("about to emit to target bucket: ({}) path:({})",
bucket, path);
+ long length = -1;
+ if (is instanceof TikaInputStream) {
+ if (((TikaInputStream)is).hasFile()) {
+ try {
+ length = ((TikaInputStream) is).getLength();
+ } catch (IOException e) {
+ throw new TikaEmitterException("exception getting length",
e);
+ }
+ }
+ }
ObjectMetadata objectMetadata = new ObjectMetadata();
if (length > 0) {
objectMetadata.setContentLength(length);
@@ -194,6 +212,16 @@ public class S3Emitter extends AbstractEmitter implements
Initializable {
this.bucket = bucket;
}
+ @Field
+ public void setPrefix(String prefix) {
+ //strip final "/" if it exists
+ if (prefix.endsWith("/")) {
+ this.prefix = prefix.substring(0, prefix.length()-1);
+ } else {
+ this.prefix = prefix;
+ }
+ }
+
/**
* If you want to customize the output file's file extension.
* Do not include the "."
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 14d0a88..2391424 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -20,24 +20,25 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.tika.client.HttpClientUtil;
+import org.apache.tika.client.TikaClientException;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-public class SolrEmitter implements Emitter, Initializable {
+public class SolrEmitter extends AbstractEmitter implements Initializable {
enum AttachmentStrategy {
SKIP,
@@ -50,7 +51,6 @@ public class SolrEmitter implements Emitter, Initializable {
private static final String UPDATE_PATH = "/update";
private static final Logger LOG =
LoggerFactory.getLogger(SolrEmitter.class);
- private String name = "solr";
private AttachmentStrategy attachmentStrategy =
AttachmentStrategy.PARENT_CHILD;
private String url;
private String contentField = "content";
@@ -58,23 +58,23 @@ public class SolrEmitter implements Emitter, Initializable {
private int commitWithin = 100;
@Override
- public String getName() {
- return name;
- }
-
- @Override
- public void emit(List<Metadata> metadataList) throws IOException,
- TikaException {
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException,
+ TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
LOG.warn("metadataList is null or empty");
return;
}
- String json = jsonify(metadataList);
+ String json = jsonify(emitKey, metadataList);
LOG.debug("emitting json:"+json);
-
HttpClientUtil.postJson(url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
json);
+ try {
+
HttpClientUtil.postJson(url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
json);
+ } catch (TikaClientException e) {
+ throw new TikaEmitterException("can't post", e);
+ }
}
- private String jsonify(List<Metadata> metadataList) {
+ private String jsonify(String emitKey, List<Metadata> metadataList) {
+ metadataList.get(0).set(idField, emitKey);
if (attachmentStrategy == AttachmentStrategy.SKIP) {
return toJsonString(jsonify(metadataList.get(0)));
} else if (attachmentStrategy ==
AttachmentStrategy.CONCATENATE_CONTENT) {
@@ -163,11 +163,6 @@ public class SolrEmitter implements Emitter, Initializable
{
}
}
- @Field
- public void setName(String name) {
- this.name = name;
- }
-
/**
* Specify the url for Solr
* @param url
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 5f1e75d..14a091a 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -38,7 +38,6 @@ public class TestBasic {
Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
List<Metadata> metadataList = new ArrayList<>();
Metadata m1 = new Metadata();
- m1.set("id", "1");
m1.set(Metadata.CONTENT_LENGTH, "314159");
m1.set(TikaCoreProperties.TIKA_CONTENT, "the quick brown");
m1.set(TikaCoreProperties.TITLE, "this is the first title");
@@ -54,6 +53,6 @@ public class TestBasic {
metadataList.add(m1);
metadataList.add(m2);
- emitter.emit(metadataList);
+ emitter.emit("1", metadataList);
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 5737eb5..12268ea 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -22,10 +22,15 @@ import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Reader;
@@ -40,11 +45,32 @@ import java.util.concurrent.TimeoutException;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+/**
+ * Iterates through a UTF-8 CSV file. This adds all columns
+ * (except for the 'fetchKeyColumn' and 'emitKeyColumn', if specified)
+ * to the metadata object.
+ * <p>
+ * <ul>
+ * <li>If a 'fetchKeyColumn' is specified, this will use that column's
value as the fetchKey.</li>
+ * <li>If no 'fetchKeyColumn' is specified, this will send the metadata
from the other columns.</li>
+ * <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
+ * </ul>
+ * <p>
+ * <ul>
+ * <li>If an 'emitKeyColumn' is specified, this will use that column's
value as the emit key.</li>
+ * <li>If an 'emitKeyColumn' is not specified, this will use the value
from the 'fetchKeyColumn'.</li>
+ * <li>The 'emitKeyColumn' value is not added to the metadata.</li>
+ * </ul>
+ *
+ */
public class CSVFetchIterator extends FetchIterator implements Initializable {
- private Charset charset = StandardCharsets.UTF_8;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CSVFetchIterator.class);
+
+ private final Charset charset = StandardCharsets.UTF_8;
private Path csvPath;
private String fetchKeyColumn;
+ private String emitKeyColumn;
@Field
@@ -57,6 +83,11 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
this.fetchKeyColumn = fetchKeyColumn;
}
+ @Field
+ public void setEmitKeyColumn(String emitKeyColumn) {
+ this.emitKeyColumn = emitKeyColumn;
+ }
+
public void setCsvPath(Path csvPath) {
this.csvPath = csvPath;
}
@@ -64,35 +95,93 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
@Override
protected void enqueue() throws InterruptedException, IOException,
TimeoutException {
String fetcherName = getFetcherName();
+ String emitterName = getEmitterName();
try (Reader reader = Files.newBufferedReader(csvPath, charset)) {
Iterable<CSVRecord> records = CSVFormat.EXCEL.parse(reader);
- int fetchKeyIndex = -1;
List<String> headers = new ArrayList<>();
+ FetchEmitKeyIndices fetchEmitKeyIndices = null;
for (CSVRecord record : records) {
- fetchKeyIndex = loadHeaders(record, headers);
+ fetchEmitKeyIndices = loadHeaders(record, headers);
break;
}
- //check that if a user set a fetchKeyColumn, but we didn't find it
- if (fetchKeyIndex < 0 && (fetchKeyColumn != null)) {
- throw new IllegalArgumentException("Can't find " +
fetchKeyColumn +
- " in the csv. I see:"+
- headers);
- }
+
+ checkFetchEmitValidity(fetcherName, emitterName,
fetchEmitKeyIndices, headers);
+
for (CSVRecord record : records) {
- String fetchKey = "";
- if (fetchKeyIndex > -1) {
- fetchKey = record.get(fetchKeyIndex);
+ String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
+ String emitKey = getEmitKey(fetchEmitKeyIndices, record);
+ if (StringUtils.isBlank(fetchKey) && !
StringUtils.isBlank(fetcherName)) {
+ LOGGER.debug("Fetcher specified ({}), but no fetchkey was
found in ({})",
+ fetcherName, record);
+ }
+ if (StringUtils.isBlank(emitKey)) {
+ throw new IOException("emitKey must not be blank in
:"+record);
}
- Metadata metadata = loadMetadata(fetchKeyIndex, headers,
record);
- tryToAdd(new FetchIdMetadataPair(new FetchId(fetcherName,
fetchKey), metadata));
+ Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers,
record);
+ tryToAdd(new FetchEmitTuple(
+ new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey), metadata));
}
}
}
- private Metadata loadMetadata(int fetchKeyIndex, List<String> headers,
CSVRecord record) {
+ private void checkFetchEmitValidity(String fetcherName,
+ String emitterName,
+ FetchEmitKeyIndices
fetchEmitKeyIndices,
+ List<String> headers) throws
IOException {
+
+ if (StringUtils.isBlank(emitterName)) {
+ throw new IOException(new TikaConfigException("must specify at
least an emitterName"));
+ }
+
+ if (StringUtils.isBlank(fetcherName) && !
StringUtils.isBlank(fetchKeyColumn)) {
+ throw new IOException(new TikaConfigException("If specifying a
'fetchKeyColumn', " +
+ "you must also specify a 'fetcherName'"));
+ }
+
+ if (StringUtils.isBlank(fetcherName)) {
+ LOGGER.debug("No fetcher specified. This will be metadata only");
+ }
+
+ //if a fetchkeycolumn is specified, make sure that it was found
+ if (! StringUtils.isBlank(fetchKeyColumn) &&
fetchEmitKeyIndices.fetchKeyIndex < 0) {
+ throw new IOException(new TikaConfigException("Couldn't find
fetchKeyColumn ("+
+ fetchKeyColumn+" in header.\n" +
+ "These are the headers I see: " + headers));
+ }
+
+ //if an emitkeycolumn is specified, make sure that it was found
+ if (! StringUtils.isBlank(emitKeyColumn) &&
fetchEmitKeyIndices.emitKeyIndex < 0) {
+ throw new IOException(new TikaConfigException("Couldn't find
emitKeyColumn ("+
+ emitKeyColumn+" in header.\n" +
+ "These are the headers I see: " + headers));
+ }
+
+ if (StringUtils.isBlank(emitKeyColumn)) {
+ LOGGER.debug("No emitKeyColumn specified. " +
+ "Will use fetchKeyColumn ({}) for both the fetch
key and emit key",
+ fetchKeyColumn);
+ }
+ }
+
+ private String getFetchKey(FetchEmitKeyIndices fetchEmitKeyIndices,
CSVRecord record) {
+ if (fetchEmitKeyIndices.fetchKeyIndex > -1) {
+ return record.get(fetchEmitKeyIndices.fetchKeyIndex);
+ }
+ return StringUtils.EMPTY;
+ }
+
+ private String getEmitKey(FetchEmitKeyIndices fetchEmitKeyIndices,
CSVRecord record) {
+ if (fetchEmitKeyIndices.emitKeyIndex > -1) {
+ return record.get(fetchEmitKeyIndices.emitKeyIndex);
+ }
+ return getFetchKey(fetchEmitKeyIndices, record);
+ }
+
+ private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices,
List<String> headers, CSVRecord record) {
Metadata metadata = new Metadata();
for (int i = 0; i < record.size(); i++) {
- if (fetchKeyIndex == i) {
+ if (fetchEmitKeyIndices.shouldSkip(i)) {
continue;
}
metadata.set(headers.get(i), record.get(i));
@@ -101,23 +190,45 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
}
- private int loadHeaders(CSVRecord record, List<String> headers) {
+ private FetchEmitKeyIndices loadHeaders(CSVRecord record, List<String>
headers)
+ throws IOException {
int fetchKeyColumnIndex = -1;
+ int emitKeyColumnIndex = -1;
for (int col = 0; col < record.size(); col++) {
String header = record.get(col);
+ if (StringUtils.isBlank(header)) {
+ throw new IOException(
+ new TikaException("Header in column (" +col +") must
not be empty"));
+ }
headers.add(header);
if (header.equals(fetchKeyColumn)) {
fetchKeyColumnIndex = col;
+ } else if (header.equals(emitKeyColumn)) {
+ emitKeyColumnIndex = col;
}
}
- return fetchKeyColumnIndex;
+ return new FetchEmitKeyIndices(fetchKeyColumnIndex,
emitKeyColumnIndex);
}
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
super.checkInitialization(problemHandler);
- mustNotBeEmpty("csvPath", this.csvPath.toString());
+ mustNotBeEmpty("csvPath", this.csvPath);
+ }
+
+ private static class FetchEmitKeyIndices {
+ private final int fetchKeyIndex;
+ private final int emitKeyIndex;
+
+ public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+ this.fetchKeyIndex = fetchKeyIndex;
+ this.emitKeyIndex = emitKeyIndex;
+ }
+
+ public boolean shouldSkip(int index) {
+ return fetchKeyIndex == index || emitKeyIndex == index;
+ }
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 2a6ab52..74f3ebe 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -14,12 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
-import org.junit.Before;
import org.junit.Test;
import java.nio.file.Path;
@@ -44,12 +41,13 @@ public class TestCSVFetchIterator {
public void testSimple() throws Exception {
Path p = get("test-simple.csv");
CSVFetchIterator it = new CSVFetchIterator();
- it.setFetcherName("fs");
+ it.setFetcherName("fsf");
+ it.setEmitterName("fse");
it.setCsvPath(p);
it.setFetchKeyColumn("fetchKey");
int numConsumers = 2;
ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
- ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
ExecutorCompletionService c = new ExecutorCompletionService(es);
c.submit(it);
List<MockFetcher> fetchers = new ArrayList<>();
@@ -70,13 +68,13 @@ public class TestCSVFetchIterator {
}
assertEquals(10, completed);
for (MockFetcher f : fetchers) {
- for (FetchIdMetadataPair pair : f.pairs) {
- String id = pair.getMetadata().get("id");
+ for (FetchEmitTuple t : f.pairs) {
+ String id = t.getMetadata().get("id");
assertEquals("path/to/my/file"+id,
- pair.getFetchId().getFetchKey());
+ t.getFetchKey().getKey());
assertEquals("project"+
(Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
- pair.getMetadata().get("project"));
+ t.getMetadata().get("project"));
}
}
}
@@ -109,20 +107,20 @@ public class TestCSVFetchIterator {
}
private static class MockFetcher implements Callable<Integer> {
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
- private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
- private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final List<FetchEmitTuple> pairs = new ArrayList<>();
+ private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
@Override
public Integer call() throws Exception {
while (true) {
- FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return pairs.size();
}
- pairs.add(p);
+ pairs.add(t);
}
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 057fecd..2c5ac1a 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -22,8 +22,9 @@ import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
@@ -43,12 +44,30 @@ import java.util.concurrent.TimeoutException;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+/**
+ * Iterates through a the results from a sql call via jdbc. This adds all
columns
+ * (except for the 'fetchKeyColumn' and 'emitKeyColumn', if specified)
+ * to the metadata object.
+ * <p>
+ * <ul>
+ * <li>If a 'fetchKeyColumn' is specified, this will use that column's
value as the fetchKey.</li>
+ * <li>If no 'fetchKeyColumn' is specified, this will send the metadata
from the other columns.</li>
+ * <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
+ * </ul>
+ * <p>
+ * <ul>
+ * <li>An 'emitKeyColumn' must be specified</li>
+ * <li>The 'emitKeyColumn' value is not added to the metadata.</li>
+ * </ul>
+ *
+ */
public class JDBCFetchIterator extends FetchIterator implements Initializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(JDBCFetchIterator.class);
private String fetchKeyColumn;
+ private String emitKeyColumn;
private String connection;
private String select;
@@ -60,6 +79,11 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
}
@Field
+ public void setEmitKeyColumn(String fetchKeyColumn) {
+ this.emitKeyColumn = fetchKeyColumn;
+ }
+
+ @Field
public void setConnection(String connection) {
this.connection = connection;
}
@@ -72,7 +96,8 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
@Override
protected void enqueue() throws InterruptedException, IOException,
TimeoutException {
String fetcherName = getFetcherName();
- int fetchKeyIndex = -1;
+ String emitterName = getEmitterName();
+ FetchEmitKeyIndices fetchEmitKeyIndices = null;
List<String> headers = new ArrayList<>();
int rowCount = 0;
LOGGER.debug("select: {}", select);
@@ -80,10 +105,11 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
try (ResultSet rs = st.executeQuery(select)) {
while (rs.next()) {
if (headers.size() == 0) {
- fetchKeyIndex = loadHeaders(rs.getMetaData(), headers);
+ fetchEmitKeyIndices = loadHeaders(rs.getMetaData(),
headers);
+ checkFetchEmitValidity(fetcherName, emitterName,
fetchEmitKeyIndices, headers);
}
try {
- processRow(fetcherName, headers, fetchKeyIndex, rs);
+ processRow(fetcherName, emitterName, headers,
fetchEmitKeyIndices, rs);
} catch (SQLException e) {
LOGGER.warn("Failed to insert: "+rs, e);
}
@@ -104,16 +130,36 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
}
}
}
+ private void checkFetchEmitValidity(String fetcherName,
+ String emitterName,
+ FetchEmitKeyIndices
fetchEmitKeyIndices,
+ List<String> headers) throws
IOException {
+
+ if (! StringUtils.isBlank(fetchKeyColumn) &&
fetchEmitKeyIndices.fetchKeyIndex < 0) {
+ throw new IOException(new TikaConfigException("Couldn't find
column: "+fetchKeyColumn));
+ }
+ if (! StringUtils.isBlank(emitKeyColumn) &&
fetchEmitKeyIndices.emitKeyIndex < 0) {
+ throw new IOException(new TikaConfigException("Couldn't find
column: "+emitKeyColumn));
+ }
+ }
- private void processRow(String fetcherName, List<String> headers,
- int fetchKeyIndex, ResultSet rs)
+ private void processRow(String fetcherName, String emitterName,
List<String> headers,
+ FetchEmitKeyIndices fetchEmitKeyIndices, ResultSet
rs)
throws SQLException, TimeoutException, InterruptedException {
Metadata metadata = new Metadata();
- String fetchKey = null;
+ String fetchKey = "";
+ String emitKey = "";
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
- if (i == fetchKeyIndex) {
+ if (i == fetchEmitKeyIndices.fetchKeyIndex) {
fetchKey = getString(i, rs);
fetchKey = (fetchKey == null) ? "" : fetchKey;
+ LOGGER.debug("fetchKey is empty for record "+toString(rs));
+ continue;
+ }
+ if (i == fetchEmitKeyIndices.emitKeyIndex) {
+ emitKey = getString(i, rs);
+ emitKey = (emitKey == null) ? "" : emitKey;
+ LOGGER.warn("emitKey is empty for record "+toString(rs));
continue;
}
String val = getString(i, rs);
@@ -122,7 +168,21 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
}
}
- tryToAdd(new FetchIdMetadataPair(new FetchId(fetcherName, fetchKey),
metadata));
+ tryToAdd(new FetchEmitTuple(
+ new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey),
+ metadata));
+ }
+
+ private String toString(ResultSet rs) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
+ String val = rs.getString(i);
+ val = (val == null) ? "" : val;
+ val = (val.length() > 100) ? val.substring(0, 100) : val;
+ sb.append(rs.getMetaData().getColumnLabel(i)+":"+val+"\n");
+ }
+ return sb.toString();
}
private String getString(int i, ResultSet rs) throws SQLException {
@@ -135,15 +195,19 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
}
- private int loadHeaders(ResultSetMetaData metaData, List<String> headers)
throws SQLException {
+ private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData,
List<String> headers) throws SQLException {
int fetchKeyIndex = -1;
+ int emitKeyIndex = -1;
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (metaData.getColumnLabel(i).equalsIgnoreCase(fetchKeyColumn)) {
fetchKeyIndex = i;
}
+ if (metaData.getColumnLabel(i).equalsIgnoreCase(emitKeyColumn)) {
+ emitKeyIndex = i;
+ }
headers.add(metaData.getColumnLabel(i));
}
- return fetchKeyIndex;
+ return new FetchEmitKeyIndices(fetchKeyIndex, emitKeyIndex);
}
@Override
@@ -161,8 +225,31 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
super.checkInitialization(problemHandler);
mustNotBeEmpty("connection", this.connection);
mustNotBeEmpty("select", this.select);
+ mustNotBeEmpty("emitterName", this.getEmitterName());
+ mustNotBeEmpty("emitKeyColumn", this.emitKeyColumn);
+
+ if (StringUtils.isBlank(getFetcherName()) && !
StringUtils.isBlank(fetchKeyColumn)) {
+ throw new TikaConfigException(
+ "If you specify a 'fetchKeyColumn', you must specify a
'fetcherName'");
+ }
+
if (StringUtils.isEmpty(fetchKeyColumn)) {
LOGGER.info("no fetch key column has been specified");
}
+
+ }
+
+ private static class FetchEmitKeyIndices {
+ private final int fetchKeyIndex;
+ private final int emitKeyIndex;
+
+ public FetchEmitKeyIndices(int fetchKeyIndex, int emitKeyIndex) {
+ this.fetchKeyIndex = fetchKeyIndex;
+ this.emitKeyIndex = emitKeyIndex;
+ }
+
+ public boolean shouldSkip(int index) {
+ return fetchKeyIndex == index || emitKeyIndex == index;
+ }
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index fb0277b..ee3fcee 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -18,12 +18,9 @@ package org.apache.tika.pipes.fetchiterator.jdbc;
import org.apache.commons.io.FileUtils;
import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -32,22 +29,18 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -99,7 +92,7 @@ public class TestJDBCFetchIterator {
FetchIterator fetchIterator = tk.getFetchIterator();
ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchIdMetadataPair> queue =
fetchIterator.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue =
fetchIterator.init(numConsumers);
completionService.submit(fetchIterator);
List<MockFetcher> fetchers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
@@ -119,8 +112,8 @@ public class TestJDBCFetchIterator {
int cnt = 0;
Matcher m = Pattern.compile("fk(\\d+)").matcher("");
for (MockFetcher f : fetchers) {
- for (FetchIdMetadataPair p : f.pairs) {
- String k = p.getFetchId().getFetchKey();
+ for (FetchEmitTuple p : f.pairs) {
+ String k = p.getFetchKey().getKey();
String num = "";
if (m.reset(k).find()) {
num = m.group(1);
@@ -143,9 +136,11 @@ public class TestJDBCFetchIterator {
" <fetchIterators>\n" +
" <fetchIterator
class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
" <params>\n" +
- " <param name=\"fetcherName\"
type=\"string\">s3</param>\n" +
+ " <param name=\"fetcherName\"
type=\"string\">s3f</param>\n" +
+ " <param name=\"emitterName\"
type=\"string\">s3e</param>\n" +
" <param name=\"queueSize\"
type=\"int\">57</param>\n" +
" <param name=\"fetchKeyColumn\"
type=\"string\">my_fetchkey</param>\n" +
+ " <param name=\"emitKeyColumn\"
type=\"string\">my_fetchkey</param>\n" +
" <param name=\"select\" type=\"string\">" +
"select id as my_id, project as my_project, fetchKey as
my_fetchKey from fetchkeys</param>\n" +
" <param name=\"connection\"
type=\"string\">jdbc:h2:file:"+
@@ -158,20 +153,20 @@ public class TestJDBCFetchIterator {
}
private static class MockFetcher implements Callable<Integer> {
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
- private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
- private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final List<FetchEmitTuple> pairs = new ArrayList<>();
+ private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
@Override
public Integer call() throws Exception {
while (true) {
- FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return pairs.size();
}
- pairs.add(p);
+ pairs.add(t);
}
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index bd85658..2d575e9 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -20,7 +20,6 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
@@ -28,8 +27,9 @@ import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,11 +97,13 @@ public class S3FetchIterator extends FetchIterator
implements Initializable {
long start = System.currentTimeMillis();
int count = 0;
for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket,
s3PathPrefix)) {
+
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
elapsed);
- tryToAdd( new FetchIdMetadataPair(
- new FetchId(fetcherName, summary.getKey()),
+ tryToAdd(new FetchEmitTuple(
+ new FetchKey(fetcherName, summary.getKey()),
+ new EmitKey(fetcherName, summary.getKey()),
new Metadata()));
count++;
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 1984577..51d20ac 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -15,22 +15,16 @@
* limitations under the License.
*/
package org.apache.tika.pipes.fetchiterator.s3;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,7 +46,7 @@ public class TestS3FetchIterator {
it.setRegion("");//select one
it.initialize(Collections.EMPTY_MAP);
int numConsumers = 6;
- ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
ExecutorCompletionService c = new ExecutorCompletionService(es);
@@ -78,20 +72,20 @@ public class TestS3FetchIterator {
}
private static class MockFetcher implements Callable<Integer> {
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
- private final List<FetchIdMetadataPair> pairs = new ArrayList<>();
- private MockFetcher(ArrayBlockingQueue<FetchIdMetadataPair> queue) {
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final List<FetchEmitTuple> pairs = new ArrayList<>();
+ private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
this.queue = queue;
}
@Override
public Integer call() throws Exception {
while (true) {
- FetchIdMetadataPair p = queue.poll(1, TimeUnit.HOURS);
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return pairs.size();
}
- pairs.add(p);
+ pairs.add(t);
}
}
}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 05423ce..7df2bb4 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -25,17 +25,15 @@ import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
import org.junit.Ignore;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -94,7 +92,7 @@ public class PipeIntegrationTests {
int numConsumers = 1;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
completionService.submit(it);
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new FSFetcherEmitter(
@@ -118,7 +116,7 @@ public class PipeIntegrationTests {
int numConsumers = 20;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchIdMetadataPair> queue = it.init(numConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
completionService.submit(it);
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new S3FetcherEmitter(
@@ -149,9 +147,9 @@ public class PipeIntegrationTests {
private final Fetcher fetcher;
private final Emitter emitter;
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
- FSFetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+ FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
fetcher, Emitter emitter) {
this.queue = queue;
this.fetcher = fetcher;
@@ -162,24 +160,24 @@ public class PipeIntegrationTests {
public Integer call() throws Exception {
while (true) {
- FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
- if (p == null) {
+ FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
+ if (t == null) {
throw new TimeoutException("");
}
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return 1;
}
- process(p);
+ process(t);
}
}
- private void process(FetchIdMetadataPair p) throws IOException,
TikaException {
- Path targ = OUTDIR.resolve(p.getFetchId().getFetchKey());
+ private void process(FetchEmitTuple t) throws IOException,
TikaException {
+ Path targ = OUTDIR.resolve(t.getFetchKey().getKey());
if (Files.isRegularFile(targ)) {
return;
}
- try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(),
p.getMetadata())) {
- System.out.println(counter.getAndIncrement() + " : "+p );
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(),
t.getMetadata())) {
+ System.out.println(counter.getAndIncrement() + " : "+t );
Files.createDirectories(targ.getParent());
Files.copy(is, targ);
}
@@ -191,9 +189,9 @@ public class PipeIntegrationTests {
private final Fetcher fetcher;
private final S3Emitter emitter;
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
- S3FetcherEmitter(ArrayBlockingQueue<FetchIdMetadataPair> queue, Fetcher
+ S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
fetcher, S3Emitter emitter) {
this.queue = queue;
this.fetcher = fetcher;
@@ -204,28 +202,23 @@ public class PipeIntegrationTests {
public Integer call() throws Exception {
while (true) {
- FetchIdMetadataPair p = queue.poll(5, TimeUnit.MINUTES);
- if (p == null) {
+ FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
+ if (t == null) {
throw new TimeoutException("");
}
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return 1;
}
- process(p);
+ process(t);
}
}
- private void process(FetchIdMetadataPair p) throws IOException,
TikaException {
+ private void process(FetchEmitTuple t) throws IOException,
TikaException {
Metadata userMetadata = new Metadata();
userMetadata.set("project", "my-project");
- try (InputStream is = fetcher.fetch(p.getFetchId().getFetchKey(),
p.getMetadata())) {
- long length = -1;
- if (is instanceof TikaInputStream &&
- ((TikaInputStream) is).hasFile()) {
- length = ((TikaInputStream)is).getLength();
- }
- emitter.emit(p.getFetchId().getFetchKey(), is, length,
userMetadata);
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(),
t.getMetadata())) {
+ emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
}
}
}
diff --git
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index d7ebcf0..6e0e26e 100644
---
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -24,7 +24,7 @@ import com.google.gson.JsonPrimitive;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.FetchId;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import java.io.IOException;
import java.util.ArrayList;
@@ -57,19 +57,20 @@ public class TikaClient {
}*/
- public TikaEmitterResult parse(FetchId fetchId, Metadata metadata, String
emitter)
+ public TikaEmitterResult parse(FetchEmitTuple fetchEmit, Metadata metadata)
throws IOException, TikaException {
TikaHttpClient client = getHttpClient();
- String jsonRequest = jsonifyRequest(fetchId, metadata, emitter);
+ String jsonRequest = jsonifyRequest(fetchEmit, metadata);
return client.postJson(jsonRequest);
}
- private String jsonifyRequest(FetchId fetchId, Metadata metadata, String
emitter) {
+ private String jsonifyRequest(FetchEmitTuple fetchEmit, Metadata metadata)
{
JsonObject root = new JsonObject();
- root.add("fetcherName", new JsonPrimitive(fetchId.getFetcherName()));
- root.add("fetchKey", new JsonPrimitive(fetchId.getFetchKey()));
- root.add("emitter", new JsonPrimitive(emitter));
+ root.add("fetcher", new
JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
+ root.add("fetchKey", new
JsonPrimitive(fetchEmit.getFetchKey().getKey()));
+ root.add("emitter", new
JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
+ root.add("emitKey", new
JsonPrimitive(fetchEmit.getEmitKey().getEmitKey()));
if (metadata.size() > 0) {
JsonObject m = new JsonObject();
for (String n : metadata.names()) {
diff --git
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 2ca9f97..1274d1f 100644
---
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -18,9 +18,8 @@ package org.apache.tika.server.client;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetcher.FetchIdMetadataPair;
-import org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
@@ -53,21 +52,18 @@ public class TikaClientCLI {
//TODO -- add an actual commandline
Path tikaConfigPath = Paths.get(args[0]);
List<String> tikaServerUrls = Arrays.asList(args[1].split(","));
- String fetcherString = args[2];
-
TikaClientCLI cli = new TikaClientCLI();
- cli.execute(tikaConfigPath, tikaServerUrls, fetcherString);
+ cli.execute(tikaConfigPath, tikaServerUrls);
}
- private void execute(Path tikaConfigPath, List<String> tikaServerUrls,
String fetcherString)
+ private void execute(Path tikaConfigPath, List<String> tikaServerUrls)
throws TikaException, IOException, SAXException {
TikaConfig config = new TikaConfig(tikaConfigPath);
ExecutorService executorService =
Executors.newFixedThreadPool(numThreads+1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(executorService);
- //TODO: fix this!
final FetchIterator fetchIterator = config.getFetchIterator();
- ArrayBlockingQueue<FetchIdMetadataPair> queue =
fetchIterator.init(numThreads);
+ final ArrayBlockingQueue<FetchEmitTuple> queue =
fetchIterator.init(numThreads);
completionService.submit(fetchIterator);
if (tikaServerUrls.size() == numThreads) {
@@ -75,12 +71,12 @@ public class TikaClientCLI {
for (int i = 0; i < numThreads; i++) {
TikaClient client = TikaClient.get(config,
Collections.singletonList(tikaServerUrls.get(i)));
- completionService.submit(new FetchWorker(queue, client,
fetcherString));
+ completionService.submit(new FetchWorker(queue, client));
}
} else {
for (int i = 0; i < numThreads; i++) {
TikaClient client = TikaClient.get(config, tikaServerUrls);
- completionService.submit(new FetchWorker(queue, client,
fetcherString));
+ completionService.submit(new FetchWorker(queue, client));
}
}
@@ -114,14 +110,11 @@ public class TikaClientCLI {
}
private class FetchWorker implements Callable<Integer> {
- private final ArrayBlockingQueue<FetchIdMetadataPair> queue;
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;
- private final String emitterString;
- public FetchWorker(ArrayBlockingQueue<FetchIdMetadataPair> queue,
TikaClient client,
- String emitterString) {
+ public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue,
TikaClient client) {
this.queue = queue;
this.client = client;
- this.emitterString = emitterString;
}
@Override
@@ -129,20 +122,20 @@ public class TikaClientCLI {
while (true) {
- FetchIdMetadataPair p = queue.poll(maxWaitMs,
TimeUnit.MILLISECONDS);
- if (p == null) {
+ FetchEmitTuple t = queue.poll(maxWaitMs,
TimeUnit.MILLISECONDS);
+ if (t == null) {
throw new TimeoutException("exceeded maxWaitMs");
}
- if (p == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
return 1;
}
try {
- LOGGER.debug("about to parse: {}", p.getFetchId());
- client.parse(p.getFetchId(), p.getMetadata(),
emitterString);
+ LOGGER.debug("about to parse: {}", t.getFetchKey());
+ client.parse(t, t.getMetadata());
} catch (IOException e) {
- LOGGER.warn(p.getFetchId().toString(), e);
+ LOGGER.warn(t.getFetchKey().toString(), e);
} catch (TikaException e) {
- LOGGER.warn(p.getFetchId().toString(), e);
+ LOGGER.warn(t.getFetchKey().toString(), e);
}
}
}
diff --git
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
new file mode 100644
index 0000000..7034c89
--- /dev/null
+++
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientConfig.java
@@ -0,0 +1,85 @@
+package org.apache.tika.server.client;
+
+import org.apache.tika.config.Param;
+import org.apache.tika.config.ServiceLoader;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.mime.MimeTypeException;
+import org.apache.tika.mime.MimeTypes;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TikaClientConfig extends TikaConfig {
+ public TikaClientConfig(String file) throws TikaException, IOException,
SAXException {
+ super(file);
+ }
+
+ public TikaClientConfig(Path path) throws TikaException, IOException,
SAXException {
+ super(path);
+ }
+
+ public TikaClientConfig(Path path, ServiceLoader loader) throws
TikaException, IOException, SAXException {
+ super(path, loader);
+ }
+
+ public TikaClientConfig(File file) throws TikaException, IOException,
SAXException {
+ super(file);
+ }
+
+ public TikaClientConfig(File file, ServiceLoader loader) throws
TikaException, IOException, SAXException {
+ super(file, loader);
+ }
+
+ public TikaClientConfig(URL url) throws TikaException, IOException,
SAXException {
+ super(url);
+ }
+
+ public TikaClientConfig(URL url, ClassLoader loader) throws TikaException,
IOException, SAXException {
+ super(url, loader);
+ }
+
+ public TikaClientConfig(URL url, ServiceLoader loader) throws
TikaException, IOException, SAXException {
+ super(url, loader);
+ }
+
+ public TikaClientConfig(InputStream stream) throws TikaException,
IOException, SAXException {
+ super(stream);
+ }
+
+ public TikaClientConfig(Document document) throws TikaException,
IOException {
+ super(document);
+ }
+
+ public TikaClientConfig(Document document, ServiceLoader loader) throws
TikaException, IOException {
+ super(document, loader);
+ }
+
+ public TikaClientConfig(Element element) throws TikaException, IOException
{
+ super(element);
+ }
+
+ public TikaClientConfig(Element element, ClassLoader loader) throws
TikaException, IOException {
+ super(element, loader);
+ }
+
+ public TikaClientConfig(ClassLoader loader) throws MimeTypeException,
IOException {
+ super(loader);
+ }
+
+ public TikaClientConfig() throws TikaException, IOException {
+ }
+
+}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 9752608..9689ba8 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -27,6 +27,7 @@ import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.utils.ExceptionUtils;
+import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,22 +56,23 @@ public class EmitterResource {
private static final String EMITTER_PARAM = "emitter";
private static final String FETCHER_NAME_ABBREV = "fn";
private static final String FETCH_KEY_ABBREV = "fk";
+ private static final String EMIT_KEY_ABBREV = "ek";
/**
* key that is safe to pass through http header.
* The user _must_ specify this for the fsemitter if calling 'put'
*/
- public static final String PATH_KEY_FOR_HTTP_HEADER =
TikaCoreProperties.SOURCE_PATH.getName().replaceAll(":", "-");
+ public static final String EMIT_KEY_FOR_HTTP_HEADER = "emit-key";
private static final Logger LOG =
LoggerFactory.getLogger(EmitterResource.class);
/**
- * @param is input stream is ignored in 'get'
+ * @param is input stream is ignored in 'get'
* @param httpHeaders
* @param info
* @param emitterName
* @param fetcherName specify the fetcherName in the url's query section
- * @param fetchKey specify the fetch key in the url's query section
+ * @param fetchKey specify the fetch key in the url's query section
* @return
* @throws Exception
*/
@@ -78,23 +80,22 @@ public class EmitterResource {
@Produces("application/json")
@Path("{" + EMITTER_PARAM + " : (\\w+)?}")
public Map<String, String> getRmeta(InputStream is, @Context HttpHeaders
httpHeaders,
- @Context UriInfo info,
- @PathParam(EMITTER_PARAM) String
emitterName,
- @QueryParam(FETCHER_NAME_ABBREV)
String fetcherName,
- @QueryParam(FETCH_KEY_ABBREV)
String fetchKey) throws Exception {
+ @Context UriInfo info,
+ @PathParam(EMITTER_PARAM) String
emitterName,
+ @QueryParam(FETCHER_NAME_ABBREV)
String fetcherName,
+ @QueryParam(FETCH_KEY_ABBREV) String
fetchKey,
+ @QueryParam(EMIT_KEY_ABBREV) String
emitKey) throws Exception {
Metadata metadata = new Metadata();
Fetcher fetcher =
TikaResource.getConfig().getFetcherManager().getFetcher(fetcherName);
List<Metadata> metadataList;
try (InputStream fetchedIs = fetcher.fetch(fetchKey, metadata)) {
- for (String n : metadata.names()) {
- System.out.println(n + " ; "+metadata.get(n));
- }
metadataList =
RecursiveMetadataResource.parseMetadata(fetchedIs,
metadata,
httpHeaders.getRequestHeaders(), info, "text");
}
- return emit(emitterName, metadataList);
+ emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
+ return emit(emitterName, emitKey, metadataList);
}
/**
@@ -106,7 +107,8 @@ public class EmitterResource {
* {@link TikaCoreProperties#TIKA_CONTENT}
* <p>
* Must specify an emitter in the path, e.g. /emit/solr
- * @param info uri info
+ *
+ * @param info uri info
* @param emitterName which emitter to use; emitters must be configured in
* the TikaConfig file.
* @return InputStream that can be deserialized as a list of {@link
Metadata} objects
@@ -116,19 +118,18 @@ public class EmitterResource {
@Produces("application/json")
@Path("{" + EMITTER_PARAM + " : (\\w+)?}")
public Map<String, String> putRmeta(InputStream is,
- @Context HttpHeaders httpHeaders,
- @Context UriInfo info,
- @PathParam(EMITTER_PARAM) String
emitterName
+ @Context HttpHeaders httpHeaders,
+ @Context UriInfo info,
+ @PathParam(EMITTER_PARAM) String
emitterName
) throws Exception {
Metadata metadata = new Metadata();
- String path = httpHeaders.getHeaderString(PATH_KEY_FOR_HTTP_HEADER);
- metadata.set(TikaCoreProperties.SOURCE_PATH, path);
+ String emitKey = httpHeaders.getHeaderString(EMIT_KEY_FOR_HTTP_HEADER);
List<Metadata> metadataList =
RecursiveMetadataResource.parseMetadata(is,
metadata,
httpHeaders.getRequestHeaders(), info, "text");
- return emit(emitterName, metadataList);
+ return emit(emitterName, emitKey, metadataList);
}
/**
@@ -142,6 +143,7 @@ public class EmitterResource {
* {@link TikaCoreProperties#TIKA_CONTENT}
* <p>
* Must specify a fetcherString and an emitter in the posted json.
+ *
* @param info uri info
* @return InputStream that can be deserialized as a list of {@link
Metadata} objects
* @throws Exception
@@ -149,16 +151,18 @@ public class EmitterResource {
@POST
@Produces("application/json")
public Map<String, String> postRmeta(InputStream is,
- @Context HttpHeaders httpHeaders,
- @Context UriInfo info
- ) throws Exception {
- JsonElement root = null;
+ @Context HttpHeaders httpHeaders,
+ @Context UriInfo info
+ ) throws Exception {
+ JsonObject root = null;
try (Reader reader = new InputStreamReader(is,
StandardCharsets.UTF_8)) {
- root = JsonParser.parseReader(reader);
+ root = JsonParser.parseReader(reader).getAsJsonObject();
}
- String fetcherName =
root.getAsJsonObject().get("fetcherName").getAsString();
- String fetchKey = root.getAsJsonObject().get("fetchKey").getAsString();
- String emitterName =
root.getAsJsonObject().get("emitter").getAsString();
+ String fetcherName = root.get("fetcher").getAsString();
+ String fetchKey = root.get("fetchKey").getAsString();
+ String emitKey = (root.has("emitKey")) ?
+ root.get("emitKey").getAsString() : fetchKey;
+ String emitterName = root.get("emitter").getAsString();
Metadata metadata = new Metadata();
@@ -181,12 +185,12 @@ public class EmitterResource {
LOG.debug("post parse/pre emit metadata {}: {}",
n, metadataList.get(0).get(n));
}
- return emit(emitterName, metadataList);
+ return emit(emitterName, emitKey, metadataList);
}
private void injectUserMetadata(Metadata metadata, JsonObject root) {
- if (root.getAsJsonObject().has("metadata")) {
- JsonObject meta =
root.getAsJsonObject().getAsJsonObject("metadata");
+ if (root.has("metadata")) {
+ JsonObject meta = root.getAsJsonObject("metadata");
for (String k : meta.keySet()) {
JsonElement val = meta.get(k);
if (val.isJsonArray()) {
@@ -209,13 +213,13 @@ public class EmitterResource {
return statusMap;
}
- private Map<String, String> emit(String emitterName, List<Metadata>
metadataList) throws TikaException {
+ private Map<String, String> emit(String emitterName, String emitKey,
List<Metadata> metadataList) throws TikaException {
Emitter emitter =
TikaResource.getConfig().getEmitterManager().getEmitter(emitterName);
String status = "ok";
String exceptionMsg = "";
try {
- emitter.emit(metadataList);
- } catch (IOException|TikaEmitterException e) {
+ emitter.emit(emitKey, metadataList);
+ } catch (IOException | TikaEmitterException e) {
LOG.warn("problem with emitting", e);
status = "emitter_exception";
exceptionMsg = ExceptionUtils.getStackTrace(e);
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
index c3f5e31..31a3cd0 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
@@ -168,7 +168,7 @@ public class TikaEmitterTest extends CXFTestBase {
public void testPost() throws Exception {
JsonObject root = new JsonObject();
- root.add("fetcherName", new JsonPrimitive("fsf"));
+ root.add("fetcher", new JsonPrimitive("fsf"));
root.add("fetchKey", new JsonPrimitive("hello_world.xml"));
root.add("emitter", new JsonPrimitive("fse"));
JsonObject userMetadata = new JsonObject();
@@ -209,7 +209,7 @@ public class TikaEmitterTest extends CXFTestBase {
public void testPut() throws Exception {
String getUrl = endPoint+EMITTER_PATH_AND_FS;
- String metaPathKey = EmitterResource.PATH_KEY_FOR_HTTP_HEADER;
+ String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
Response response = WebClient
.create(getUrl)
@@ -237,7 +237,7 @@ public class TikaEmitterTest extends CXFTestBase {
public void testPostNPE() throws Exception {
JsonObject root = new JsonObject();
- root.add("fetcherName", new JsonPrimitive("fsf"));
+ root.add("fetcher", new JsonPrimitive("fsf"));
root.add("fetchKey", new JsonPrimitive("null_pointer.xml"));
root.add("emitter", new JsonPrimitive("fse"));
JsonObject userMetadata = new JsonObject();
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 65e0f57..554aee1 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -267,7 +267,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
private String getJsonString(String fileName) {
JsonObject root = new JsonObject();
- root.add("fetcherName", new JsonPrimitive(FETCHER_NAME));
+ root.add("fetcher", new JsonPrimitive(FETCHER_NAME));
root.add("fetchKey", new JsonPrimitive(fileName));
root.add("emitter", new JsonPrimitive(EMITTER_NAME));
return GSON.toJson(root);