This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-3288 by this push:
new a116b2f TIKA-3288 -- add onParseException
a116b2f is described below
commit a116b2f3781b1c16b12aa0de605505b8eb02cb11
Author: tballison <[email protected]>
AuthorDate: Wed Feb 3 12:24:39 2021 -0500
TIKA-3288 -- add onParseException
---
.../tika/pipes/fetchiterator/FetchEmitTuple.java | 20 +++++-
.../tika/pipes/fetchiterator/FetchIterator.java | 22 ++++++
.../tika/pipes/emitter/solr/SolrEmitter.java | 2 +-
.../pipes/fetchiterator/csv/CSVFetchIterator.java | 3 +-
.../fetchiterator/jdbc/JDBCFetchIterator.java | 2 +-
.../pipes/fetchiterator/s3/S3FetchIterator.java | 2 +-
.../metadata/serialization/JsonFetchEmitTuple.java | 43 +++++++++---
.../tika/server/core/resource/AsyncParser.java | 52 +++++++++++---
.../tika/server/core/resource/EmitterResource.java | 79 ++++++++++++----------
.../core/TikaServerAsyncIntegrationTest.java | 12 ++--
.../core/TikaServerEmitterIntegrationTest.java | 79 ++++++++++++++++++++--
11 files changed, 244 insertions(+), 72 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
index 2792d9b..efab236 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
@@ -22,14 +22,25 @@ import org.apache.tika.pipes.fetcher.FetchKey;
public class FetchEmitTuple {
+ public enum ON_PARSE_EXCEPTION {
+ SKIP,
+ EMIT
+ }
+ public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION =
ON_PARSE_EXCEPTION.EMIT;
private final FetchKey fetchKey;
private final EmitKey emitKey;
private final Metadata metadata;
+ private final ON_PARSE_EXCEPTION onParseException;
public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata
metadata) {
+ this(fetchKey, emitKey, metadata, DEFAULT_ON_PARSE_EXCEPTION);
+ }
+ public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata
metadata,
+ ON_PARSE_EXCEPTION onParseException) {
this.fetchKey = fetchKey;
this.emitKey = emitKey;
this.metadata = metadata;
+ this.onParseException = onParseException;
}
public FetchKey getFetchKey() {
@@ -44,12 +55,17 @@ public class FetchEmitTuple {
return metadata;
}
+ public ON_PARSE_EXCEPTION getOnParseException() {
+ return onParseException;
+ }
+
@Override
public String toString() {
return "FetchEmitTuple{" +
"fetchKey=" + fetchKey +
", emitKey=" + emitKey +
", metadata=" + metadata +
+ ", onParseException=" + onParseException +
'}';
}
@@ -62,7 +78,8 @@ public class FetchEmitTuple {
if (fetchKey != null ? !fetchKey.equals(that.fetchKey) : that.fetchKey
!= null) return false;
if (emitKey != null ? !emitKey.equals(that.emitKey) : that.emitKey !=
null) return false;
- return metadata != null ? metadata.equals(that.metadata) :
that.metadata == null;
+ if (metadata != null ? !metadata.equals(that.metadata) : that.metadata
!= null) return false;
+ return onParseException == that.onParseException;
}
@Override
@@ -70,6 +87,7 @@ public class FetchEmitTuple {
int result = fetchKey != null ? fetchKey.hashCode() : 0;
result = 31 * result + (emitKey != null ? emitKey.hashCode() : 0);
result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+ result = 31 * result + (onParseException != null ?
onParseException.hashCode() : 0);
return result;
}
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 6b9457e..505d042 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -49,6 +49,8 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
private String fetcherName;
private String emitterName;
private int added = 0;
+ private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException =
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
+
public FetchIterator() {
}
@@ -95,6 +97,26 @@ public abstract class FetchIterator implements
Callable<Integer>, Initializable
this.queueSize = queueSize;
}
+ @Field
+ public void setOnParseException(String onParseException) throws
TikaConfigException {
+ if ("skip".equalsIgnoreCase(onParseException)) {
+ setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+ } else if ("emit".equalsIgnoreCase(onParseException)) {
+ setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+ } else {
+ throw new TikaConfigException("must be either 'skip' or 'emit': "
+ + onParseException);
+ }
+ }
+
+ public void setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION
onParseException) {
+ this.onParseException = onParseException;
+ }
+
+ public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
+ return onParseException;
+ }
+
@Override
public Integer call() throws Exception {
if (queue == null || numConsumers < 0) {
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 2e70ebf..0e09e9c 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -58,7 +58,7 @@ public class SolrEmitter extends AbstractEmitter implements
Initializable {
private static final String ATTACHMENTS = "attachments";
private static final String UPDATE_PATH = "/update";
private static final Logger LOG =
LoggerFactory.getLogger(SolrEmitter.class);
- //one day this will be allowed?
+ //one day this will be allowed or can be configured?
private final boolean gzipJson = false;
private AttachmentStrategy attachmentStrategy =
AttachmentStrategy.PARENT_CHILD;
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 12268ea..7cb0c02 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -120,7 +120,8 @@ public class CSVFetchIterator extends FetchIterator
implements Initializable {
Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers,
record);
tryToAdd(new FetchEmitTuple(
new FetchKey(fetcherName, fetchKey),
- new EmitKey(emitterName, emitKey), metadata));
+ new EmitKey(emitterName, emitKey), metadata,
+ getOnParseException()));
}
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 2c5ac1a..19da544 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -171,7 +171,7 @@ public class JDBCFetchIterator extends FetchIterator
implements Initializable {
tryToAdd(new FetchEmitTuple(
new FetchKey(fetcherName, fetchKey),
new EmitKey(emitterName, emitKey),
- metadata));
+ metadata, getOnParseException()));
}
private String toString(ResultSet rs) throws SQLException {
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 2d575e9..dd14463 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -104,7 +104,7 @@ public class S3FetchIterator extends FetchIterator
implements Initializable {
tryToAdd(new FetchEmitTuple(
new FetchKey(fetcherName, summary.getKey()),
new EmitKey(fetcherName, summary.getKey()),
- new Metadata()));
+ new Metadata(), getOnParseException()));
count++;
}
long elapsed = System.currentTimeMillis() - start;
diff --git
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 345873f..55d10ef 100644
---
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -30,20 +30,23 @@ import java.io.IOException;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;
+import java.util.Locale;
public class JsonFetchEmitTuple {
- private static final String FETCHER = "fetcher";
- private static final String FETCHKEY = "fetchKey";
+ public static final String FETCHER = "fetcher";
+ public static final String FETCHKEY = "fetchKey";
public static final String EMITTER = "emitter";
public static final String EMITKEY = "emitKey";
- private static final String METADATAKEY = "metadata";
+ public static final String METADATAKEY = "metadata";
+ public static final String ON_PARSE_EXCEPTION = "onParseException";
+
public static FetchEmitTuple fromJson(Reader reader) throws IOException {
JsonParser jParser = new JsonFactory().createParser(reader);
- JsonToken token =jParser.nextToken();
+ JsonToken token = jParser.nextToken();
if (token != JsonToken.START_OBJECT) {
- throw new IOException("require start object, but see:
"+token.name());
+ throw new IOException("require start object, but see: " +
token.name());
}
return parseFetchEmitTuple(jParser);
}
@@ -58,6 +61,7 @@ public class JsonFetchEmitTuple {
String fetchKey = null;
String emitterName = null;
String emitKey = null;
+ FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = null;
Metadata metadata = new Metadata();
while (token != JsonToken.END_OBJECT) {
if (token != JsonToken.FIELD_NAME) {
@@ -78,20 +82,37 @@ public class JsonFetchEmitTuple {
throw new IOException("required start object, but see: " +
token.name());
}
metadata = JsonMetadata.readMetadataObject(jParser);
+ } else if (ON_PARSE_EXCEPTION.equals(name)) {
+ String value = getValue(jParser);
+ if ("skip".equalsIgnoreCase(value)) {
+ onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP;
+ } else if ("emit".equalsIgnoreCase(value)) {
+ onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
+ } else {
+ throw new IOException(ON_PARSE_EXCEPTION +
+ " must be either 'skip' or 'emit'");
+ }
}
token = jParser.nextToken();
}
- return new FetchEmitTuple(
- new FetchKey(fetcherName, fetchKey),
- new EmitKey(emitterName, emitKey), metadata
- );
+ if (onParseException == null) {
+ return new FetchEmitTuple(
+ new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey), metadata
+ );
+ } else {
+ return new FetchEmitTuple(
+ new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey), metadata,
onParseException
+ );
+ }
}
private static String getValue(JsonParser jParser) throws IOException {
JsonToken token = jParser.nextToken();
if (token != JsonToken.VALUE_STRING) {
- throw new IOException("required value string, but see:
"+token.name());
+ throw new IOException("required value string, but see: " +
token.name());
}
return jParser.getValueAsString();
}
@@ -121,6 +142,8 @@ public class JsonFetchEmitTuple {
jsonGenerator.writeFieldName(METADATAKEY);
JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator,
false);
}
+ jsonGenerator.writeStringField(ON_PARSE_EXCEPTION,
+ t.getOnParseException().name().toLowerCase(Locale.US));
jsonGenerator.writeEndObject();
}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index 46098e9..c20fc75 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core.resource;
import org.apache.cxf.jaxrs.impl.UriInfoImpl;
import org.apache.cxf.message.MessageImpl;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
@@ -35,33 +36,37 @@ import java.util.concurrent.TimeUnit;
/**
* Worker thread that takes {@link FetchEmitTuple} off the queue, parses
- * the file and puts the {@link EmitData} on the queue for the {@link
AsyncEmitter}.
+ * the file and puts the {@link EmitData} on the emitDataQueue for the {@link
AsyncEmitter}.
*
*/
public class AsyncParser implements Callable<Integer> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncParser.class);
- private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTupleQueue;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
ArrayBlockingQueue<EmitData> emitData) {
- this.queue = queue;
+ this.fetchEmitTupleQueue = queue;
this.emitDataQueue = emitData;
}
@Override
public Integer call() throws Exception {
- int parsed = 0;
while (true) {
- FetchEmitTuple request = queue.poll(1, TimeUnit.MINUTES);
+ FetchEmitTuple request = fetchEmitTupleQueue.poll(1,
TimeUnit.MINUTES);
if (request != null) {
EmitData emitData = processTuple(request);
- boolean offered = emitDataQueue.offer(emitData, 10,
TimeUnit.MINUTES);
- parsed++;
- if (! offered) {
- //TODO: deal with this
+ boolean shouldEmit = checkForParseException(request, emitData);
+ if (shouldEmit) {
+ boolean offered = emitDataQueue.offer(emitData, 10,
TimeUnit.MINUTES);
+ if (!offered) {
+ //TODO: deal with this
+ LOG.warn("Failed to add ({}) " +
+ "to emit queue after 10 minutes.",
+ request.getFetchKey().getKey());
+ }
}
} else {
LOG.trace("Nothing on the async queue");
@@ -69,6 +74,34 @@ public class AsyncParser implements Callable<Integer> {
}
}
+ private boolean checkForParseException(FetchEmitTuple request, EmitData
emitData) {
+ if (emitData == null || emitData.getMetadataList() == null ||
+ emitData.getMetadataList().size() == 0) {
+ LOG.warn("empty or null emit data ({})",
request.getFetchKey().getKey());
+ return false;
+ }
+ boolean shouldEmit = true;
+ Metadata container = emitData.getMetadataList().get(0);
+ String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
+ if (stack != null) {
+ LOG.warn("fetchKey ({}) container parse exception ({})",
+ request.getFetchKey().getKey(), stack);
+ if (request.getOnParseException() ==
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+ shouldEmit = false;
+ }
+ }
+
+ for (int i = 1; i < emitData.getMetadataList().size(); i++) {
+ Metadata m = emitData.getMetadataList().get(i);
+ String embeddedStack =
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+ if (embeddedStack != null) {
+ LOG.warn("fetchKey ({}) embedded parse exception ({})",
+ request.getFetchKey().getKey(), embeddedStack);
+ }
+ }
+ return shouldEmit;
+ }
+
private EmitData processTuple(FetchEmitTuple t) {
Metadata userMetadata = t.getMetadata();
Metadata metadata = new Metadata();
@@ -88,6 +121,7 @@ public class AsyncParser implements Callable<Integer> {
} catch (Exception e) {
LOG.warn(t.toString(), e);
}
+
injectUserMetadata(userMetadata, metadataList);
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getKey())) {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 871af8b..6338dc4 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core.resource;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import org.apache.tika.Tika;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
@@ -178,63 +179,67 @@ public class EmitterResource {
return returnError(t.getEmitKey().getEmitterName(), error);
}
+ boolean shouldEmit = checkParseException(t, metadataList);
+ if (! shouldEmit) {
+ return skip(t, metadataList);
+ }
+
injectUserMetadata(t.getMetadata(), metadataList.get(0));
for (String n : metadataList.get(0).names()) {
LOG.debug("post parse/pre emit metadata {}: {}",
n, metadataList.get(0).get(n));
}
+ return emit(calcEmitKey(t), metadataList);
+ }
+
+ static EmitKey calcEmitKey(FetchEmitTuple t) {
//use fetch key if emitter key is not specified
- //clean this up?
+ //TODO: clean this up?
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(),
t.getFetchKey().getKey());
}
- return emit(emitKey, metadataList);
+ return emitKey;
}
- private FetchEmitTuple deserializeTuple(JsonParser jParser) throws
IOException {
- String fetcherName = null;
- String fetchKey = null;
- String emitterName = null;
- String emitKey = null;
+ private Map<String, String> skip(FetchEmitTuple t, List<Metadata>
metadataList) {
+ Map<String, String> statusMap = new HashMap<>();
+ statusMap.put("status", "ok");
+ statusMap.put("emitter", t.getEmitKey().getEmitterName());
+ statusMap.put("emitKey", t.getEmitKey().getKey());
+ String msg =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+ statusMap.put("parse_exception", msg);
+ return statusMap;
+ }
- Metadata metadata = null;
- while (jParser.nextToken() != JsonToken.END_OBJECT) {
- String currentName = jParser.getCurrentName();
- if ("fetcherName".equals(currentName)) {
- fetcherName = currentName;
- } else if ("fetchKey".equals(currentName)) {
- fetchKey = currentName;
- } else if ("emitterName".equals(currentName)) {
- emitterName = currentName;
- } else if ("emitKey".equals(currentName)) {
- emitKey = currentName;
- } else if ("metadata".equals(currentName)) {
- metadata = deserializeMetadata(jParser);
+ private boolean checkParseException(FetchEmitTuple t, List<Metadata>
metadataList) {
+ if (metadataList == null || metadataList.size() < 1) {
+ return false;
+ }
+ boolean shouldEmit = true;
+ String stack =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+ if (stack != null) {
+ if (t.getOnParseException() ==
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+ shouldEmit = false;
}
+ LOG.warn("fetchKey ({}) container parse exception ({})",
+ t.getFetchKey().getKey(), stack);
}
- return new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
- new EmitKey(emitterName, emitKey), metadata);
- }
-
- private Metadata deserializeMetadata(JsonParser jParser) throws
IOException {
- Metadata metadata = new Metadata();
- while (jParser.nextToken() != JsonToken.END_OBJECT) {
- String key = jParser.getCurrentName();
- JsonToken token = jParser.nextToken();
- if (jParser.isExpectedStartArrayToken()) {
- while (jParser.nextToken() != JsonToken.END_ARRAY) {
- metadata.add(key, jParser.getText());
- }
- } else {
- metadata.set(key, token.asString());
+ for (int i = 1; i < metadataList.size(); i++) {
+ Metadata m = metadataList.get(i);
+ String embeddedStack =
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+ if (embeddedStack != null) {
+ LOG.warn("fetchKey ({}) embedded parse exception ({})",
+ t.getFetchKey().getKey(), embeddedStack);
}
}
- return metadata;
+ return shouldEmit;
}
+
+
private void injectUserMetadata(Metadata userMetadata, Metadata metadata) {
for (String n : userMetadata.names()) {
metadata.set(n, null);
@@ -260,7 +265,7 @@ public class EmitterResource {
try {
emitter.emit(emitKey.getKey(), metadataList);
} catch (IOException | TikaEmitterException e) {
- LOG.warn("problem with emitting", e);
+ LOG.warn("problem emitting ("+emitKey.getKey()+")", e);
status = "emitter_exception";
exceptionMsg = ExceptionUtils.getStackTrace(e);
}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 2d188d9..e1812dc 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -56,18 +56,20 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
+ private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION
+ = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
private static Path TMP_DIR;
private static Path TMP_OUTPUT_DIR;
private static String TIKA_CONFIG_XML;
private static Path TIKA_CONFIG;
- private static final int NUM_FILES = 8034;
+ private static final int NUM_FILES = 1000;
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
private static List<String> FILE_LIST = new ArrayList<>();
private static String[] FILES = new String[] {
- "hello_world.xml",
+ "hello_world.xml", "null_pointer.xml"
// "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
};
@@ -155,8 +157,10 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
try {
JsonNode response = sendAsync(FILE_LIST);
System.out.println(response);
+ int expected = (ON_PARSE_EXCEPTION ==
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
+ FILE_LIST.size() : FILE_LIST.size()/2;
int targets = 0;
- while (targets < NUM_FILES) {
+ while (targets < FILE_LIST.size()) {
System.out.println("targets "+targets);
targets = countTargets();
Thread.sleep(1000);
@@ -219,7 +223,7 @@ public class TikaServerAsyncIntegrationTest extends
IntegrationTestBase {
return new FetchEmitTuple(
new FetchKey(FETCHER_NAME, fileName),
new EmitKey(EMITTER_NAME, ""),
- new Metadata()
+ new Metadata(), ON_PARSE_EXCEPTION
);
}
}
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 04488d1..bea4cee 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -65,7 +65,8 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
private static String[] FILES = new String[]{
"hello_world.xml",
- "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+ "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml",
+ "null_pointer.xml"
};
@BeforeClass
@@ -124,7 +125,6 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
}
}
-
@Test
public void testBasic() throws Exception {
@@ -152,6 +152,65 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
}
}
+ @Test
+ public void testNPEDefault() throws Exception {
+
+ Thread serverThread = new Thread() {
+ @Override
+ public void run() {
+ TikaServerCli.main(
+ new String[]{
+ "-enableUnsecureFeatures",
+ "-maxFiles", "2000",
+ "-p", INTEGRATION_TEST_PORT,
+ "-tmpFilePrefix", "basic-",
+ "-config",
TIKA_CONFIG.toAbsolutePath().toString()
+ });
+ }
+ };
+ serverThread.start();
+ try {
+ JsonNode node = testOne("null_pointer.xml", true);
+ assertEquals("ok", node.get("status").asText());
+ assertContains("java.lang.NullPointerException",
+ node.get("parse_exception").asText());
+ } catch (Exception e) {
+ fail("shouldn't have an exception" + e.getMessage());
+ } finally {
+ serverThread.interrupt();
+ }
+ }
+
+ @Test
+ public void testNPESkip() throws Exception {
+
+ Thread serverThread = new Thread() {
+ @Override
+ public void run() {
+ TikaServerCli.main(
+ new String[]{
+ "-enableUnsecureFeatures",
+ "-maxFiles", "2000",
+ "-p", INTEGRATION_TEST_PORT,
+ "-tmpFilePrefix", "basic-",
+ "-config",
TIKA_CONFIG.toAbsolutePath().toString()
+ });
+ }
+ };
+ serverThread.start();
+ try {
+ JsonNode node = testOne("null_pointer.xml", false,
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+ assertEquals("ok", node.get("status").asText());
+ assertContains("java.lang.NullPointerException",
+ node.get("parse_exception").asText());
+ } catch (Exception e) {
+ fail("shouldn't have an exception" + e.getMessage());
+ } finally {
+ serverThread.interrupt();
+ }
+ }
+
@Test(expected = ProcessingException.class)
public void testSystemExit() throws Exception {
@@ -252,17 +311,23 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
throw new TimeoutException("couldn't connect to server after " +
elapsed + " ms");
}
-
private JsonNode testOne(String fileName, boolean shouldFileExist) throws
Exception {
+ return testOne(fileName, shouldFileExist,
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+ }
+
+ private JsonNode testOne(String fileName, boolean shouldFileExist,
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws Exception {
+
awaitServerStartup();
Response response = WebClient
.create(endPoint + "/emit")
.accept("application/json")
- .post(getJsonString(fileName));
+ .post(getJsonString(fileName, onParseException));
if (response.getStatus() == 200) {
+ Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
if (shouldFileExist) {
- Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
assertTrue(Files.size(targFile) > 1);
+ } else {
+ assertFalse(Files.isRegularFile(targFile));
}
Reader reader = new InputStreamReader((InputStream)
response.getEntity(), UTF_8);
return new ObjectMapper().readTree(reader);
@@ -270,11 +335,11 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
return null;
}
- private String getJsonString(String fileName) throws IOException {
+ private String getJsonString(String fileName,
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws IOException {
FetchEmitTuple t = new FetchEmitTuple(
new FetchKey(FETCHER_NAME, fileName),
new EmitKey(EMITTER_NAME, ""),
- new Metadata()
+ new Metadata(), onParseException
);
return JsonFetchEmitTuple.toJson(t);
}