This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-3288 by this push:
     new a116b2f  TIKA-3288 -- add onParseException
a116b2f is described below

commit a116b2f3781b1c16b12aa0de605505b8eb02cb11
Author: tballison <[email protected]>
AuthorDate: Wed Feb 3 12:24:39 2021 -0500

    TIKA-3288 -- add onParseException
---
 .../tika/pipes/fetchiterator/FetchEmitTuple.java   | 20 +++++-
 .../tika/pipes/fetchiterator/FetchIterator.java    | 22 ++++++
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  2 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  3 +-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  2 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  2 +-
 .../metadata/serialization/JsonFetchEmitTuple.java | 43 +++++++++---
 .../tika/server/core/resource/AsyncParser.java     | 52 +++++++++++---
 .../tika/server/core/resource/EmitterResource.java | 79 ++++++++++++----------
 .../core/TikaServerAsyncIntegrationTest.java       | 12 ++--
 .../core/TikaServerEmitterIntegrationTest.java     | 79 ++++++++++++++++++++--
 11 files changed, 244 insertions(+), 72 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
index 2792d9b..efab236 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
@@ -22,14 +22,25 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 
 public class FetchEmitTuple {
 
+    public enum ON_PARSE_EXCEPTION {
+        SKIP,
+        EMIT
+    }
+    public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION = 
ON_PARSE_EXCEPTION.EMIT;
     private final FetchKey fetchKey;
     private final EmitKey emitKey;
     private final Metadata metadata;
+    private final ON_PARSE_EXCEPTION onParseException;
 
     public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata 
metadata) {
+        this(fetchKey, emitKey, metadata, DEFAULT_ON_PARSE_EXCEPTION);
+    }
+    public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata 
metadata,
+                          ON_PARSE_EXCEPTION onParseException) {
         this.fetchKey = fetchKey;
         this.emitKey = emitKey;
         this.metadata = metadata;
+        this.onParseException = onParseException;
     }
 
     public FetchKey getFetchKey() {
@@ -44,12 +55,17 @@ public class FetchEmitTuple {
         return metadata;
     }
 
+    public ON_PARSE_EXCEPTION getOnParseException() {
+        return onParseException;
+    }
+
     @Override
     public String toString() {
         return "FetchEmitTuple{" +
                 "fetchKey=" + fetchKey +
                 ", emitKey=" + emitKey +
                 ", metadata=" + metadata +
+                ", onParseException=" + onParseException +
                 '}';
     }
 
@@ -62,7 +78,8 @@ public class FetchEmitTuple {
 
         if (fetchKey != null ? !fetchKey.equals(that.fetchKey) : that.fetchKey 
!= null) return false;
         if (emitKey != null ? !emitKey.equals(that.emitKey) : that.emitKey != 
null) return false;
-        return metadata != null ? metadata.equals(that.metadata) : 
that.metadata == null;
+        if (metadata != null ? !metadata.equals(that.metadata) : that.metadata 
!= null) return false;
+        return onParseException == that.onParseException;
     }
 
     @Override
@@ -70,6 +87,7 @@ public class FetchEmitTuple {
         int result = fetchKey != null ? fetchKey.hashCode() : 0;
         result = 31 * result + (emitKey != null ? emitKey.hashCode() : 0);
         result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+        result = 31 * result + (onParseException != null ? 
onParseException.hashCode() : 0);
         return result;
     }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 6b9457e..505d042 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -49,6 +49,8 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
     private String fetcherName;
     private String emitterName;
     private int added = 0;
+    private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = 
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
+
     public FetchIterator() {
 
     }
@@ -95,6 +97,26 @@ public abstract class FetchIterator implements 
Callable<Integer>, Initializable
         this.queueSize = queueSize;
     }
 
+    @Field
+    public void setOnParseException(String onParseException) throws 
TikaConfigException {
+        if ("skip".equalsIgnoreCase(onParseException)) {
+            setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+        } else if ("emit".equalsIgnoreCase(onParseException)) {
+            setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+        } else {
+            throw new TikaConfigException("must be either 'skip' or 'emit': "
+                    + onParseException);
+        }
+    }
+
+    public void setOnParseException(FetchEmitTuple.ON_PARSE_EXCEPTION 
onParseException) {
+        this.onParseException = onParseException;
+    }
+
+    public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
+        return onParseException;
+    }
+
     @Override
     public Integer call() throws Exception {
         if (queue == null || numConsumers < 0) {
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 2e70ebf..0e09e9c 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -58,7 +58,7 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = 
LoggerFactory.getLogger(SolrEmitter.class);
-    //one day this will be allowed?
+    //one day this will be allowed or can be configured?
     private final boolean gzipJson = false;
 
     private AttachmentStrategy attachmentStrategy = 
AttachmentStrategy.PARENT_CHILD;
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 12268ea..7cb0c02 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -120,7 +120,8 @@ public class CSVFetchIterator extends FetchIterator 
implements Initializable {
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, 
record);
                 tryToAdd(new FetchEmitTuple(
                         new FetchKey(fetcherName, fetchKey),
-                        new EmitKey(emitterName, emitKey), metadata));
+                        new EmitKey(emitterName, emitKey), metadata,
+                        getOnParseException()));
             }
         }
     }
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 2c5ac1a..19da544 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -171,7 +171,7 @@ public class JDBCFetchIterator extends FetchIterator 
implements Initializable {
         tryToAdd(new FetchEmitTuple(
                 new FetchKey(fetcherName, fetchKey),
                 new EmitKey(emitterName, emitKey),
-                metadata));
+                metadata, getOnParseException()));
     }
 
     private String toString(ResultSet rs) throws SQLException {
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 2d575e9..dd14463 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ 
b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -104,7 +104,7 @@ public class S3FetchIterator extends FetchIterator 
implements Initializable {
             tryToAdd(new FetchEmitTuple(
                     new FetchKey(fetcherName, summary.getKey()),
                     new EmitKey(fetcherName, summary.getKey()),
-                    new Metadata()));
+                    new Metadata(), getOnParseException()));
             count++;
         }
         long elapsed = System.currentTimeMillis() - start;
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 345873f..55d10ef 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -30,20 +30,23 @@ import java.io.IOException;
 import java.io.Reader;
 import java.io.StringWriter;
 import java.io.Writer;
+import java.util.Locale;
 
 public class JsonFetchEmitTuple {
 
-    private static final String FETCHER = "fetcher";
-    private static final String FETCHKEY = "fetchKey";
+    public static final String FETCHER = "fetcher";
+    public static final String FETCHKEY = "fetchKey";
     public static final String EMITTER = "emitter";
     public static final String EMITKEY = "emitKey";
-    private static final String METADATAKEY = "metadata";
+    public static final String METADATAKEY = "metadata";
+    public static final String ON_PARSE_EXCEPTION = "onParseException";
+
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
         JsonParser jParser = new JsonFactory().createParser(reader);
-        JsonToken token =jParser.nextToken();
+        JsonToken token = jParser.nextToken();
         if (token != JsonToken.START_OBJECT) {
-            throw new IOException("require start object, but see: 
"+token.name());
+            throw new IOException("require start object, but see: " + 
token.name());
         }
         return parseFetchEmitTuple(jParser);
     }
@@ -58,6 +61,7 @@ public class JsonFetchEmitTuple {
         String fetchKey = null;
         String emitterName = null;
         String emitKey = null;
+        FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = null;
         Metadata metadata = new Metadata();
         while (token != JsonToken.END_OBJECT) {
             if (token != JsonToken.FIELD_NAME) {
@@ -78,20 +82,37 @@ public class JsonFetchEmitTuple {
                     throw new IOException("required start object, but see: " + 
token.name());
                 }
                 metadata = JsonMetadata.readMetadataObject(jParser);
+            } else if (ON_PARSE_EXCEPTION.equals(name)) {
+                String value = getValue(jParser);
+                if ("skip".equalsIgnoreCase(value)) {
+                    onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP;
+                } else if ("emit".equalsIgnoreCase(value)) {
+                    onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
+                } else {
+                    throw new IOException(ON_PARSE_EXCEPTION +
+                            " must be either 'skip' or 'emit'");
+                }
             }
             token = jParser.nextToken();
         }
 
-        return new FetchEmitTuple(
-                new FetchKey(fetcherName, fetchKey),
-                new EmitKey(emitterName, emitKey), metadata
-        );
+        if (onParseException == null) {
+            return new FetchEmitTuple(
+                    new FetchKey(fetcherName, fetchKey),
+                    new EmitKey(emitterName, emitKey), metadata
+            );
+        } else {
+            return new FetchEmitTuple(
+                    new FetchKey(fetcherName, fetchKey),
+                    new EmitKey(emitterName, emitKey), metadata, 
onParseException
+            );
+        }
     }
 
     private static String getValue(JsonParser jParser) throws IOException {
         JsonToken token = jParser.nextToken();
         if (token != JsonToken.VALUE_STRING) {
-            throw new IOException("required value string, but see: 
"+token.name());
+            throw new IOException("required value string, but see: " + 
token.name());
         }
         return jParser.getValueAsString();
     }
@@ -121,6 +142,8 @@ public class JsonFetchEmitTuple {
             jsonGenerator.writeFieldName(METADATAKEY);
             JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, 
false);
         }
+        jsonGenerator.writeStringField(ON_PARSE_EXCEPTION,
+                t.getOnParseException().name().toLowerCase(Locale.US));
         jsonGenerator.writeEndObject();
 
     }
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index 46098e9..c20fc75 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core.resource;
 import org.apache.cxf.jaxrs.impl.UriInfoImpl;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
@@ -35,33 +36,37 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Worker thread that takes {@link FetchEmitTuple} off the queue, parses
- * the file and puts the {@link EmitData} on the queue for the {@link 
AsyncEmitter}.
+ * the file and puts the {@link EmitData} on the emitDataQueue for the {@link 
AsyncEmitter}.
  *
  */
 public class AsyncParser implements Callable<Integer> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncParser.class);
 
-    private final ArrayBlockingQueue<FetchEmitTuple> queue;
+    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTupleQueue;
     private final ArrayBlockingQueue<EmitData> emitDataQueue;
 
     public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
                        ArrayBlockingQueue<EmitData> emitData) {
-        this.queue = queue;
+        this.fetchEmitTupleQueue = queue;
         this.emitDataQueue = emitData;
     }
 
     @Override
     public Integer call() throws Exception {
-        int parsed = 0;
         while (true) {
-            FetchEmitTuple request = queue.poll(1, TimeUnit.MINUTES);
+            FetchEmitTuple request = fetchEmitTupleQueue.poll(1, 
TimeUnit.MINUTES);
             if (request != null) {
                 EmitData emitData = processTuple(request);
-                boolean offered = emitDataQueue.offer(emitData, 10, 
TimeUnit.MINUTES);
-                parsed++;
-                if (! offered) {
-                    //TODO: deal with this
+                boolean shouldEmit = checkForParseException(request, emitData);
+                if (shouldEmit) {
+                    boolean offered = emitDataQueue.offer(emitData, 10, 
TimeUnit.MINUTES);
+                    if (!offered) {
+                        //TODO: deal with this
+                        LOG.warn("Failed to add ({}) " +
+                                "to emit queue after 10 minutes.",
+                                request.getFetchKey().getKey());
+                    }
                 }
             } else {
                 LOG.trace("Nothing on the async queue");
@@ -69,6 +74,34 @@ public class AsyncParser implements Callable<Integer> {
         }
     }
 
+    private boolean checkForParseException(FetchEmitTuple request, EmitData 
emitData) {
+        if (emitData == null || emitData.getMetadataList() == null ||
+            emitData.getMetadataList().size() == 0) {
+            LOG.warn("empty or null emit data ({})", 
request.getFetchKey().getKey());
+            return false;
+        }
+        boolean shouldEmit = true;
+        Metadata container = emitData.getMetadataList().get(0);
+        String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
+        if (stack != null) {
+            LOG.warn("fetchKey ({}) container parse exception ({})",
+                    request.getFetchKey().getKey(), stack);
+            if (request.getOnParseException() == 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+                shouldEmit = false;
+            }
+        }
+
+        for (int i = 1; i < emitData.getMetadataList().size(); i++) {
+            Metadata m = emitData.getMetadataList().get(i);
+            String embeddedStack = 
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+            if (embeddedStack != null) {
+                LOG.warn("fetchKey ({}) embedded parse exception ({})",
+                        request.getFetchKey().getKey(), embeddedStack);
+            }
+        }
+        return shouldEmit;
+    }
+
     private EmitData processTuple(FetchEmitTuple t) {
         Metadata userMetadata = t.getMetadata();
         Metadata metadata = new Metadata();
@@ -88,6 +121,7 @@ public class AsyncParser implements Callable<Integer> {
         } catch (Exception e) {
             LOG.warn(t.toString(), e);
         }
+
         injectUserMetadata(userMetadata, metadataList);
         EmitKey emitKey = t.getEmitKey();
         if (StringUtils.isBlank(emitKey.getKey())) {
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 871af8b..6338dc4 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core.resource;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import org.apache.tika.Tika;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
@@ -178,63 +179,67 @@ public class EmitterResource {
             return returnError(t.getEmitKey().getEmitterName(), error);
         }
 
+        boolean shouldEmit = checkParseException(t, metadataList);
+        if (! shouldEmit) {
+            return skip(t, metadataList);
+        }
+
         injectUserMetadata(t.getMetadata(), metadataList.get(0));
 
         for (String n : metadataList.get(0).names()) {
             LOG.debug("post parse/pre emit metadata {}: {}",
                     n, metadataList.get(0).get(n));
         }
+        return emit(calcEmitKey(t), metadataList);
+    }
+
+    static EmitKey calcEmitKey(FetchEmitTuple t) {
         //use fetch key if emitter key is not specified
-        //clean this up?
+        //TODO: clean this up?
         EmitKey emitKey = t.getEmitKey();
         if (StringUtils.isBlank(emitKey.getKey())) {
             emitKey = new EmitKey(emitKey.getEmitterName(), 
t.getFetchKey().getKey());
         }
-        return emit(emitKey, metadataList);
+        return emitKey;
     }
 
-    private FetchEmitTuple deserializeTuple(JsonParser jParser) throws 
IOException {
-        String fetcherName = null;
-        String fetchKey = null;
-        String emitterName = null;
-        String emitKey = null;
+    private Map<String, String> skip(FetchEmitTuple t, List<Metadata> 
metadataList) {
+            Map<String, String> statusMap = new HashMap<>();
+            statusMap.put("status", "ok");
+            statusMap.put("emitter", t.getEmitKey().getEmitterName());
+            statusMap.put("emitKey", t.getEmitKey().getKey());
+            String msg = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+            statusMap.put("parse_exception", msg);
+            return statusMap;
+    }
 
-        Metadata metadata = null;
-        while (jParser.nextToken() != JsonToken.END_OBJECT) {
-            String currentName = jParser.getCurrentName();
-            if ("fetcherName".equals(currentName)) {
-                fetcherName = currentName;
-            } else if ("fetchKey".equals(currentName)) {
-                fetchKey = currentName;
-            } else if ("emitterName".equals(currentName)) {
-                emitterName = currentName;
-            } else if ("emitKey".equals(currentName)) {
-                emitKey = currentName;
-            } else if ("metadata".equals(currentName)) {
-                metadata = deserializeMetadata(jParser);
+    private boolean checkParseException(FetchEmitTuple t, List<Metadata> 
metadataList) {
+        if (metadataList == null || metadataList.size() < 1) {
+            return false;
+        }
+        boolean shouldEmit = true;
+        String stack = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
+        if (stack != null) {
+            if (t.getOnParseException() == 
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+                shouldEmit = false;
             }
+            LOG.warn("fetchKey ({}) container parse exception ({})",
+                    t.getFetchKey().getKey(), stack);
         }
-        return new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
-                new EmitKey(emitterName, emitKey), metadata);
-    }
-
-    private Metadata deserializeMetadata(JsonParser jParser) throws 
IOException {
-        Metadata metadata = new Metadata();
 
-        while (jParser.nextToken() != JsonToken.END_OBJECT) {
-            String key = jParser.getCurrentName();
-            JsonToken token = jParser.nextToken();
-            if (jParser.isExpectedStartArrayToken()) {
-                while (jParser.nextToken() != JsonToken.END_ARRAY) {
-                    metadata.add(key, jParser.getText());
-                }
-            } else {
-                metadata.set(key, token.asString());
+        for (int i = 1; i < metadataList.size(); i++) {
+            Metadata m = metadataList.get(i);
+            String embeddedStack = 
m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+            if (embeddedStack != null) {
+                LOG.warn("fetchKey ({}) embedded parse exception ({})",
+                        t.getFetchKey().getKey(), embeddedStack);
             }
         }
-        return metadata;
 
+        return shouldEmit;
     }
+
+
     private void injectUserMetadata(Metadata userMetadata, Metadata metadata) {
         for (String n : userMetadata.names()) {
             metadata.set(n, null);
@@ -260,7 +265,7 @@ public class EmitterResource {
         try {
             emitter.emit(emitKey.getKey(), metadataList);
         } catch (IOException | TikaEmitterException e) {
-            LOG.warn("problem with emitting", e);
+            LOG.warn("problem emitting ("+emitKey.getKey()+")", e);
             status = "emitter_exception";
             exceptionMsg = ExceptionUtils.getStackTrace(e);
         }
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 2d188d9..e1812dc 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -56,18 +56,20 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
 
+    private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION
+            = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
     private static Path TMP_DIR;
     private static Path TMP_OUTPUT_DIR;
     private static String TIKA_CONFIG_XML;
     private static Path TIKA_CONFIG;
 
-    private static final int NUM_FILES = 8034;
+    private static final int NUM_FILES = 1000;
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
 
     private static List<String> FILE_LIST = new ArrayList<>();
     private static String[] FILES = new String[] {
-            "hello_world.xml",
+            "hello_world.xml", "null_pointer.xml"
            // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
     };
 
@@ -155,8 +157,10 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
         try {
             JsonNode response = sendAsync(FILE_LIST);
             System.out.println(response);
+            int expected = (ON_PARSE_EXCEPTION == 
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
+                    FILE_LIST.size() : FILE_LIST.size()/2;
             int targets = 0;
-            while (targets < NUM_FILES) {
+            while (targets < FILE_LIST.size()) {
                 System.out.println("targets "+targets);
                 targets = countTargets();
                 Thread.sleep(1000);
@@ -219,7 +223,7 @@ public class TikaServerAsyncIntegrationTest extends 
IntegrationTestBase {
         return new FetchEmitTuple(
                 new FetchKey(FETCHER_NAME, fileName),
                 new EmitKey(EMITTER_NAME, ""),
-                new Metadata()
+                new Metadata(), ON_PARSE_EXCEPTION
         );
     }
 }
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 04488d1..bea4cee 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -65,7 +65,8 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
 
     private static String[] FILES = new String[]{
             "hello_world.xml",
-            "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+            "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml",
+            "null_pointer.xml"
     };
 
     @BeforeClass
@@ -124,7 +125,6 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         }
     }
 
-
     @Test
     public void testBasic() throws Exception {
 
@@ -152,6 +152,65 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         }
     }
 
+    @Test
+    public void testNPEDefault() throws Exception {
+
+        Thread serverThread = new Thread() {
+            @Override
+            public void run() {
+                TikaServerCli.main(
+                        new String[]{
+                                "-enableUnsecureFeatures",
+                                "-maxFiles", "2000",
+                                "-p", INTEGRATION_TEST_PORT,
+                                "-tmpFilePrefix", "basic-",
+                                "-config", 
TIKA_CONFIG.toAbsolutePath().toString()
+                        });
+            }
+        };
+        serverThread.start();
+        try {
+            JsonNode node = testOne("null_pointer.xml", true);
+            assertEquals("ok", node.get("status").asText());
+            assertContains("java.lang.NullPointerException",
+                    node.get("parse_exception").asText());
+        } catch (Exception e) {
+            fail("shouldn't have an exception" + e.getMessage());
+        } finally {
+            serverThread.interrupt();
+        }
+    }
+
+    @Test
+    public void testNPESkip() throws Exception {
+
+        Thread serverThread = new Thread() {
+            @Override
+            public void run() {
+                TikaServerCli.main(
+                        new String[]{
+                                "-enableUnsecureFeatures",
+                                "-maxFiles", "2000",
+                                "-p", INTEGRATION_TEST_PORT,
+                                "-tmpFilePrefix", "basic-",
+                                "-config", 
TIKA_CONFIG.toAbsolutePath().toString()
+                        });
+            }
+        };
+        serverThread.start();
+        try {
+            JsonNode node = testOne("null_pointer.xml", false,
+                    FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
+            assertEquals("ok", node.get("status").asText());
+            assertContains("java.lang.NullPointerException",
+                    node.get("parse_exception").asText());
+        } catch (Exception e) {
+            fail("shouldn't have an exception" + e.getMessage());
+        } finally {
+            serverThread.interrupt();
+        }
+    }
+
     @Test(expected = ProcessingException.class)
     public void testSystemExit() throws Exception {
 
@@ -252,17 +311,23 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         throw new TimeoutException("couldn't connect to server after " +
                 elapsed + " ms");
     }
-
     private JsonNode testOne(String fileName, boolean shouldFileExist) throws 
Exception {
+        return testOne(fileName, shouldFileExist, 
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+    }
+
+    private JsonNode testOne(String fileName, boolean shouldFileExist, 
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws Exception {
+
         awaitServerStartup();
         Response response = WebClient
                 .create(endPoint + "/emit")
                 .accept("application/json")
-                .post(getJsonString(fileName));
+                .post(getJsonString(fileName, onParseException));
         if (response.getStatus() == 200) {
+            Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
             if (shouldFileExist) {
-                Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
                 assertTrue(Files.size(targFile) > 1);
+            } else {
+                assertFalse(Files.isRegularFile(targFile));
             }
             Reader reader = new InputStreamReader((InputStream) 
response.getEntity(), UTF_8);
             return new ObjectMapper().readTree(reader);
@@ -270,11 +335,11 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
         return null;
     }
 
-    private String getJsonString(String fileName) throws IOException {
+    private String getJsonString(String fileName, 
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws IOException {
         FetchEmitTuple t = new FetchEmitTuple(
                 new FetchKey(FETCHER_NAME, fileName),
                 new EmitKey(EMITTER_NAME, ""),
-                new Metadata()
+                new Metadata(), onParseException
         );
         return JsonFetchEmitTuple.toJson(t);
     }

Reply via email to