This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 73b0287742c54f21d3a2a6fa442b058d363de68a
Author: tballison <[email protected]>
AuthorDate: Mon Feb 1 17:46:32 2021 -0500

    TIKA-3288 -- WIP do not merge.
---
 .../apache/tika/pipes/emitter/AbstractEmitter.java |  33 ++++++
 .../org/apache/tika/pipes/emitter/EmitData.java    |  32 ++++++
 .../org/apache/tika/pipes/emitter/EmitKey.java     |   2 +-
 .../org/apache/tika/pipes/emitter/Emitter.java     |  10 +-
 .../apache/tika/pipes/emitter/EmptyEmitter.java    |   7 +-
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |   5 +
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  92 +++++++++------
 tika-pipes/tika-httpclient-commons/pom.xml         |   5 -
 .../apache/tika/pipes/PipeIntegrationTests.java    |   2 +-
 .../org/apache/tika/server/client/TikaClient.java  |   2 +-
 .../org/apache/tika/server/core/ServerStatus.java  |   2 +-
 .../tika/server/core/ServerStatusWatcher.java      |   2 +-
 .../org/apache/tika/server/core/TikaServerCli.java |   5 +
 .../apache/tika/server/core/TikaServerProcess.java |  80 ++++++++++++--
 .../tika/server/core/resource/AsyncEmitter.java    | 123 +++++++++++++++++++++
 .../tika/server/core/resource/AsyncRequest.java    |  23 ++++
 .../tika/server/core/resource/AsyncResource.java   | 123 +++++++++++++++++++++
 .../server/core/TikaServerIntegrationTest.java     |  10 +-
 18 files changed, 489 insertions(+), 69 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 1117537..a54b708 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -17,6 +17,10 @@
 package org.apache.tika.pipes.emitter;
 
 import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.util.List;
 
 public abstract class AbstractEmitter implements Emitter {
 
@@ -31,4 +35,33 @@ public abstract class AbstractEmitter implements Emitter {
     public String getName() {
         return name;
     }
+
+    /**
+     * The default behavior is to call {@link #emit(String, List)} on each 
item.
+     * Some implementations, e.g. Solr/ES/vespa, can benefit from subclassing 
this and
+     * emitting a bunch of docs at once.
+     *
+     * @param emitData
+     * @throws IOException
+     * @throws TikaEmitterException
+     */
+    @Override
+    public void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException {
+        for (EmitData d : emitData) {
+            emit(d.getEmitKey().getKey(), d.getMetadataList());
+        }
+    }
+
+    public static long estimateSizeInBytes(String id, List<Metadata> 
metadataList) {
+        long sz = 36 + id.length() * 2;
+        for (Metadata m : metadataList) {
+            for (String n : m.names()) {
+                sz += 36 + n.length() * 2;
+                for (String v : m.getValues(n)) {
+                    sz += 36 + v.length() * 2;
+                }
+            }
+        }
+        return sz;
+    }
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
new file mode 100644
index 0000000..0063c6a
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -0,0 +1,32 @@
+package org.apache.tika.pipes.emitter;
+
+import org.apache.tika.metadata.Metadata;
+
+import java.util.List;
+
+public class EmitData {
+
+    private final EmitKey emitKey;
+    private final List<Metadata> metadataList;
+
+    public EmitData(EmitKey emitKey, List<Metadata> metadataList) {
+        this.emitKey = emitKey;
+        this.metadataList = metadataList;
+    }
+
+    public EmitKey getEmitKey() {
+        return emitKey;
+    }
+
+    public List<Metadata> getMetadataList() {
+        return metadataList;
+    }
+
+    @Override
+    public String toString() {
+        return "EmitData{" +
+                "emitKey=" + emitKey +
+                ", metadataList=" + metadataList +
+                '}';
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index 47a8ee7..aa53dfd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -30,7 +30,7 @@ public class EmitKey {
         return emitterName;
     }
 
-    public String getEmitKey() {
+    public String getKey() {
         return emitKey;
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 4f6bdae..dbea669 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -27,14 +27,8 @@ public interface Emitter {
 
     void emit(String emitKey, List<Metadata> metadataList) throws IOException, 
TikaEmitterException;
 
+    void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException;
+    //TODO -- add this later for xhtml?
     //void emit(String txt, Metadata metadata) throws IOException, 
TikaException;
 
-   /*
-    TODO we can add this later?
-    void addBatch(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException;
-
-    void executeBatch() throws IOException, TikaEmitterException;
-
-    */
-
 }
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 8c0ebda..5f83db7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -16,13 +16,13 @@
  */
 package org.apache.tika.pipes.emitter;
 
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
 import java.util.List;
 
 public class EmptyEmitter implements Emitter {
+
     @Override
     public String getName() {
         return "empty";
@@ -32,4 +32,9 @@ public class EmptyEmitter implements Emitter {
     public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
 
     }
+
+    @Override
+    public void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException {
+
+    }
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml 
b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index 1edfff0..e36d1c8 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -42,6 +42,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 5830b14..4c15889 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,9 +16,8 @@
  */
 package org.apache.tika.pipes.emitter.solr;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.http.client.HttpClient;
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.client.HttpClientUtil;
@@ -28,7 +27,7 @@ import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -36,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -48,7 +48,6 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
         PARENT_CHILD,
         //anything else?
     }
-    private static final Gson GSON = new Gson();
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = 
LoggerFactory.getLogger(SolrEmitter.class);
@@ -61,14 +60,23 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
     private HttpClientFactory httpClientFactory;
     private HttpClient httpClient;
 
+    public SolrEmitter() throws TikaConfigException {
+        httpClientFactory = new HttpClientFactory();
+    }
     @Override
     public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException,
             TikaEmitterException {
+
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
-        String json = jsonify(emitKey, metadataList);
+        StringWriter writer = new StringWriter();
+        JsonGenerator jsonGenerator = new 
JsonFactory().createGenerator(writer);
+        jsonGenerator.writeStartArray();
+        jsonify(jsonGenerator, emitKey, metadataList);
+        jsonGenerator.writeEndArray();
+        String json = writer.toString();
         LOG.debug("emitting json:"+json);
         try {
             HttpClientUtil.postJson(httpClient,
@@ -78,10 +86,35 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
         }
     }
 
-    private String jsonify(String emitKey, List<Metadata> metadataList) {
+    @Override
+    public void emit(List<EmitData> batch) throws IOException,
+            TikaEmitterException {
+        if (batch == null || batch.size() == 0) {
+            LOG.warn("batch is null or empty");
+            return;
+        }
+        StringWriter writer = new StringWriter();
+        JsonGenerator jsonGenerator = new 
JsonFactory().createGenerator(writer);
+        jsonGenerator.writeStartArray();
+        for (EmitData d : batch) {
+            jsonify(jsonGenerator, d.getKey(), d.getMetadataList());
+        }
+        jsonGenerator.writeEndArray();
+        String json = writer.toString();
+        LOG.debug("emitting json:"+json);
+        try {
+            HttpClientUtil.postJson(httpClient,
+                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), json);
+        } catch (TikaClientException e) {
+            throw new TikaEmitterException("can't post", e);
+        }
+    }
+
+    private void jsonify(JsonGenerator jsonGenerator, String emitKey, 
List<Metadata> metadataList) throws IOException {
         metadataList.get(0).set(idField, emitKey);
-        if (attachmentStrategy == AttachmentStrategy.SKIP) {
-            return toJsonString(jsonify(metadataList.get(0)));
+        if (attachmentStrategy == AttachmentStrategy.SKIP ||
+            metadataList.size() == 1) {
+            jsonify(metadataList.get(0), jsonGenerator);
         } else if (attachmentStrategy == 
AttachmentStrategy.CONCATENATE_CONTENT) {
             //this only handles text for now, not xhtml
             StringBuilder sb = new StringBuilder();
@@ -93,53 +126,40 @@ public class SolrEmitter extends AbstractEmitter 
implements Initializable {
             }
             Metadata parent = metadataList.get(0);
             parent.set(getContentField(), sb.toString());
-            return toJsonString(jsonify(parent));
+            jsonify(parent, jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
-            if (metadataList.size() == 1) {
-                JsonObject obj = jsonify(metadataList.get(0));
-                return toJsonString(obj);
-            }
-            JsonObject parent = jsonify(metadataList.get(0));
-            JsonArray children = new JsonArray();
+            jsonify(metadataList.get(0), jsonGenerator);
+            jsonGenerator.writeArrayFieldStart(ATTACHMENTS);
+
             for (int i = 1; i < metadataList.size(); i++) {
                 Metadata m = metadataList.get(i);
                 m.set(idField, UUID.randomUUID().toString());
-                children.add(jsonify(m));
+                jsonify(m, jsonGenerator);
             }
-            parent.add(ATTACHMENTS, children);
-            return toJsonString(parent);
+            jsonGenerator.writeEndArray();
         } else {
             throw new IllegalArgumentException("I don't yet support this 
attachment strategy: "
                     + attachmentStrategy);
         }
     }
 
-    private String toJsonString(JsonObject obj) {
-        //wrap the document into an array
-        //so that Solr correctly interprets this as
-        //upload docs vs a command.
-        JsonArray docs = new JsonArray();
-        docs.add(obj);
-        return GSON.toJson(docs);
-    }
 
-    private JsonObject jsonify(Metadata metadata) {
-        JsonObject obj = new JsonObject();
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) 
throws IOException {
+        jsonGenerator.writeStartObject();
         for (String n : metadata.names()) {
+
             String[] vals = metadata.getValues(n);
             if (vals.length == 0) {
                 continue;
             } else if (vals.length == 1) {
-                obj.addProperty(n, vals[0]);
+                jsonGenerator.writeStringField(n, vals[0]);
             } else if (vals.length > 1) {
-                JsonArray valArr = new JsonArray();
-                for (int i = 0; i < vals.length; i++) {
-                    valArr.add(vals[i]);
-                }
-                obj.add(n, valArr);
+                jsonGenerator.writeArrayFieldStart(n);
+                jsonGenerator.writeArray(vals, 0, vals.length);
+                jsonGenerator.writeEndArray();
             }
         }
-        return obj;
+        jsonGenerator.writeEndObject();
     }
 
     /**
diff --git a/tika-pipes/tika-httpclient-commons/pom.xml 
b/tika-pipes/tika-httpclient-commons/pom.xml
index ef7797a..c33d3fb 100644
--- a/tika-pipes/tika-httpclient-commons/pom.xml
+++ b/tika-pipes/tika-httpclient-commons/pom.xml
@@ -42,11 +42,6 @@
             <artifactId>httpclient</artifactId>
             <version>${httpcomponents.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>${gson.version}</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 7df2bb4..3831f07 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -218,7 +218,7 @@ public class PipeIntegrationTests {
             userMetadata.set("project", "my-project");
 
             try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), 
t.getMetadata())) {
-                emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
+                emitter.emit(t.getEmitKey().getKey(), is, userMetadata);
             }
         }
     }
diff --git 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 6e0e26e..73ff3f4 100644
--- 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -70,7 +70,7 @@ public class TikaClient {
         root.add("fetcher", new 
JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
         root.add("fetchKey", new 
JsonPrimitive(fetchEmit.getFetchKey().getKey()));
         root.add("emitter", new 
JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
-        root.add("emitKey", new 
JsonPrimitive(fetchEmit.getEmitKey().getEmitKey()));
+        root.add("emitKey", new 
JsonPrimitive(fetchEmit.getEmitKey().getKey()));
         if (metadata.size() > 0) {
             JsonObject m = new JsonObject();
             for (String n : metadata.names()) {
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
index a15f7a5..92068f5 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
@@ -41,7 +41,7 @@ public class ServerStatus {
     public enum STATUS {
         INITIALIZING(0),
         OPERATING(1),
-        HIT_MAX(2),
+        HIT_MAX_FILES(2),
         TIMEOUT(3),
         ERROR(4),
         PARENT_REQUESTED_SHUTDOWN(5),
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
index a2ad313..7ac0ffd 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
@@ -136,7 +136,7 @@ public class ServerStatusWatcher implements Runnable {
         }
         long filesProcessed = serverStatus.getFilesProcessed();
         if (filesProcessed >= maxFiles) {
-            serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX);
+            serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX_FILES);
         }
     }
 
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index dbcbeb1..4ca9bb4 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -140,7 +140,10 @@ public class TikaServerCli {
             try {
                 mainLoop(line, newArgs);
             } catch (InterruptedException e) {
+                e.printStackTrace();
                 //swallow
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
     }
@@ -173,11 +176,13 @@ public class TikaServerCli {
                     WatchDogResult result = future.get();
                     LOG.debug("main loop future: ({}); about to restart", 
result);
                     if (maxRestarts < 0 || result.getNumRestarts() < 
maxRestarts) {
+                        System.err.println("starting up again");
                         executorCompletionService.submit(
                                 new TikaServerWatchDog(args, result.getPort(),
                                 result.getId(),
                                 result.getNumRestarts(), serverTimeoutConfig));
                     } else {
+                        System.err.println("finished!");
                         LOG.warn("id {} with port {} has exceeded maxRestarts 
{}. Shutting down and not restarting.",
                                 result.getId(), result.getPort(), maxRestarts);
                         finished++;
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 0a8fd91..94f2190 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.cxf.binding.BindingFactoryManager;
+import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.jaxrs.JAXRSBindingFactory;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
@@ -36,6 +37,8 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
+import org.apache.tika.server.core.resource.AsyncEmitter;
+import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
 import org.apache.tika.server.core.resource.EmitterResource;
 import org.apache.tika.server.core.resource.LanguageResource;
@@ -72,13 +75,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class TikaServerProcess {
 
 
     //used in fork mode -- restart after processing this many files
     private static final long DEFAULT_MAX_FILES = 100000;
-
+    private static String ENABLE_UNSECURE_FEATURES = "enableUnsecureFeatures";
 
     private static final int DEFAULT_DIGEST_MARK_LIMIT = 20 * 1024 * 1024;
     public static final Set<String> LOG_LEVELS = new 
HashSet<>(Arrays.asList("debug", "info"));
@@ -132,7 +140,7 @@ public class TikaServerProcess {
 
             CommandLineParser cliParser = new DefaultParser();
             CommandLine line = cliParser.parse(options, args);
-            runServer(line, options);
+            mainLoop(line, options);
         } catch (Exception e) {
             e.printStackTrace();
             LOG.error("Can't start: ", e);
@@ -140,10 +148,35 @@ public class TikaServerProcess {
         }
     }
 
+    private static void mainLoop(CommandLine commandLine, Options options) 
throws Exception {
+        AsyncResource asyncResource = null;
+        ArrayBlockingQueue asyncQueue = null;
+        int numAsyncThreads = 10;
+        if (commandLine.hasOption("unsecureFeatures")) {
+            asyncResource = new AsyncResource();
+            asyncQueue = asyncResource.getQueue(numAsyncThreads);
+        }
+
+        ServerDetails serverDetails = initServer(commandLine, asyncResource);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numAsyncThreads+1);
+        ExecutorCompletionService<Integer> executorCompletionService = new 
ExecutorCompletionService<>(executorService);
+
+        executorCompletionService.submit(new ServerThread(serverDetails));
+        if (asyncQueue != null) {
+            for (int i = 0; i < numAsyncThreads; i++) {
+                executorCompletionService.submit(new AsyncEmitter(asyncQueue));
+            }
+        }
+        while (true) {
+
+        }
+    }
+
     //This starts the server in this process.  This can
     //be either a direct call from -noFork or the process that is forked
     //in the 2.0 default mode.
-    private static void runServer(CommandLine line, Options options) throws 
Exception {
+    private static ServerDetails initServer(CommandLine line,
+                                     AsyncResource asyncResource) throws 
Exception {
 
         String host = null;
 
@@ -224,12 +257,12 @@ public class TikaServerProcess {
         }
 
         InputStreamFactory inputStreamFactory = null;
-        if (line.hasOption("enableUnsecureFeatures")) {
+        if (line.hasOption(ENABLE_UNSECURE_FEATURES)) {
             inputStreamFactory = new 
FetcherStreamFactory(tika.getFetcherManager());
         } else {
             inputStreamFactory = new DefaultInputStreamFactory();
         }
-        logFetchersAndEmitters(line.hasOption("enableUnsecureFeatures"), tika);
+        logFetchersAndEmitters(line.hasOption(ENABLE_UNSECURE_FEATURES), tika);
         String serverId = line.hasOption("i") ? line.getOptionValue("i") : 
UUID.randomUUID().toString();
         LOG.debug("SERVER ID:" + serverId);
         ServerStatus serverStatus;
@@ -272,8 +305,9 @@ public class TikaServerProcess {
         rCoreProviders.add(new SingletonResourceProvider(new TikaDetectors()));
         rCoreProviders.add(new SingletonResourceProvider(new TikaParsers()));
         rCoreProviders.add(new SingletonResourceProvider(new TikaVersion()));
-        if (line.hasOption("enableUnsecureFeatures")) {
+        if (line.hasOption(ENABLE_UNSECURE_FEATURES)) {
             rCoreProviders.add(new SingletonResourceProvider(new 
EmitterResource()));
+            rCoreProviders.add(new SingletonResourceProvider(asyncResource));
         }
         rCoreProviders.addAll(loadResourceServices());
         if (line.hasOption("status")) {
@@ -315,11 +349,11 @@ public class TikaServerProcess {
         JAXRSBindingFactory factory = new JAXRSBindingFactory();
         factory.setBus(sf.getBus());
         manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, 
factory);
-        sf.create();
-        LOG.info("Started Apache Tika server {} at {}",
-                serverId,
-                url);
-
+        ServerDetails details = new ServerDetails();
+        details.sf = sf;
+        details.url = url;
+        details.serverId = serverId;
+        return details;
     }
 
     private static void logFetchersAndEmitters(boolean enableUnsecureFeatures, 
TikaConfig tika) {
@@ -421,4 +455,28 @@ public class TikaServerProcess {
         return serverTimeouts;
     }
 
+    private static class ServerThread implements Callable<Integer> {
+        private final ServerDetails serverDetails;
+        public ServerThread(ServerDetails serverDetails) {
+            this.serverDetails = serverDetails;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+
+            Server server = serverDetails.sf.create();
+            System.err.println("started : "+serverDetails.serverId);
+                LOG.info("Started Apache Tika server {} at {}",
+                        serverDetails.serverId,
+                        serverDetails.url);
+            System.err.println("returning : "+serverDetails.serverId);
+                return 2;
+        }
+    }
+
+    private static class ServerDetails {
+        JAXRSServerFactoryBean sf;
+        String serverId;
+        String url;
+    }
 }
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
new file mode 100644
index 0000000..f473699
--- /dev/null
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -0,0 +1,123 @@
+package org.apache.tika.server.core.resource;
+
+import org.apache.cxf.jaxrs.impl.UriInfoImpl;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedHashMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker thread that takes ASyncRequests off the queue and
+ * processes them.
+ */
+public class AsyncEmitter implements Callable<Integer> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
+
+
+    private long maxCacheSizeBytes = 10_000_000;
+
+    private final ArrayBlockingQueue<AsyncRequest> queue;
+
+    public AsyncEmitter(ArrayBlockingQueue<AsyncRequest> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        while (true) {
+            AsyncRequest request = queue.poll(1, TimeUnit.MINUTES);
+            if (request != null) {
+                processTuple(request);
+            } else {
+                LOG.trace("Nothing on the async queue");
+            }
+        }
+    }
+
+    private void processTuple(AsyncRequest request) {
+        LOG.debug("Starting request id ({}) of size ({})",
+                request.getId(), request.getTuples().size());
+        List<EmitData> cachedEmitData = new ArrayList<>();
+        Emitter emitter = TikaResource.getConfig()
+                .getEmitterManager()
+                .getEmitter(
+                    request.getTuples().get(0).getEmitKey().getEmitterName());
+        long currSize = 0;
+        for (FetchEmitTuple t : request.getTuples()) {
+            EmitData emitData = processTuple(t);
+            long estimated = AbstractEmitter.estimateSizeInBytes(
+                    emitData.getEmitKey().getKey(), 
emitData.getMetadataList());
+            if (estimated + currSize > maxCacheSizeBytes) {
+                tryToEmit(emitter, cachedEmitData, request);
+                cachedEmitData.clear();
+            }
+            cachedEmitData.add(emitData);
+            currSize += estimated;
+        }
+        tryToEmit(emitter, cachedEmitData, request);
+        cachedEmitData.clear();
+        LOG.debug("Completed request id ({})",
+                request.getId(), request.getTuples().size());
+    }
+
+    private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData,
+                           AsyncRequest request) {
+        try {
+            emitter.emit(cachedEmitData);
+        } catch (IOException|TikaEmitterException e) {
+            LOG.warn("async id ({}) emitter class ({}): {}",
+                    request.getId(), emitter.getClass(),
+                    ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    private EmitData processTuple(FetchEmitTuple t) {
+        Metadata userMetadata = t.getMetadata();
+        Metadata metadata = new Metadata();
+        String fetcherName = t.getFetchKey().getFetcherName();
+        String fetchKey = t.getFetchKey().getKey();
+        List<Metadata> metadataList = null;
+        try (InputStream stream =
+                     TikaResource.getConfig().getFetcherManager()
+                             .getFetcher(fetcherName).fetch(fetchKey, 
metadata)) {
+
+            metadataList = RecursiveMetadataResource.parseMetadata(
+                    stream,
+                    metadata,
+                    new MultivaluedHashMap<>(),
+                    new UriInfoImpl(new MessageImpl()), "text");
+        } catch (SecurityException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn(t.toString(), e);
+        }
+        injectUserMetadata(userMetadata, metadataList);
+        return new EmitData(t.getEmitKey(), metadataList);
+    }
+
+    private void injectUserMetadata(Metadata userMetadata, List<Metadata> 
metadataList) {
+        for (String n : userMetadata.names()) {
+            //overwrite whatever was there
+            metadataList.get(0).set(n, null);
+            for (String val : userMetadata.getValues(n)) {
+                metadataList.get(0).add(n, val);
+            }
+        }
+    }
+}
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
new file mode 100644
index 0000000..f46105c
--- /dev/null
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
@@ -0,0 +1,23 @@
+package org.apache.tika.server.core.resource;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+
+import java.util.List;
+
+public class AsyncRequest {
+    private final String id;
+    private final List<FetchEmitTuple> tuples;
+
+    public AsyncRequest(String id, List<FetchEmitTuple> tuples) {
+        this.id = id;
+        this.tuples = tuples;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public List<FetchEmitTuple> getTuples() {
+        return tuples;
+    }
+}
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
new file mode 100644
index 0000000..240933f
--- /dev/null
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server.core.resource;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.FetcherManager;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+import java.io.InputStream;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@Path("/async")
+public class AsyncResource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncResource.class);
+
+    private static final int DEFAULT_QUEUE_SIZE = 100;
+    private int queueSize = DEFAULT_QUEUE_SIZE;
+    private ArrayBlockingQueue<AsyncRequest> queue;
+
+    public ArrayBlockingQueue<AsyncRequest> getQueue(int numThreads) {
+        this.queue = new ArrayBlockingQueue<>(queueSize+numThreads);
+        return queue;
+    }
+    /**
+     * The client posts a json request.  At a minimum, this must be a
+     * json object that contains an emitter and a fetcherString key with
+     * the key to fetch the inputStream. Optionally, it may contain a metadata
+     * object that will be used to populate the metadata key for pass
+     * through of metadata from the client.
+     * <p>
+     * The extracted text content is stored with the key
+     * {@link TikaCoreProperties#TIKA_CONTENT}
+     * <p>
+     * Must specify a fetcherString and an emitter in the posted json.
+     *
+     * @param info uri info
+     * @return InputStream that can be deserialized as a list of {@link 
Metadata} objects
+     * @throws Exception
+     */
+    @POST
+    @Produces("application/json")
+    public Map<String, String> post(InputStream is,
+                                         @Context HttpHeaders httpHeaders,
+                                         @Context UriInfo info
+    ) throws Exception {
+
+        AsyncRequest request = deserializeASyncRequest(is);
+        FetcherManager fetcherManager = 
TikaConfig.getDefaultConfig().getFetcherManager();
+        EmitterManager emitterManager = 
TikaConfig.getDefaultConfig().getEmitterManager();
+        for (FetchEmitTuple t : request.getTuples()) {
+            if (! 
fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) {
+                return badFetcher(t.getFetchKey());
+            }
+            if (! 
emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) {
+                return badEmitter(t.getEmitKey());
+            }
+        }
+
+        //parameterize
+        boolean offered = queue.offer(request, 60, TimeUnit.SECONDS);
+        if (! offered) {
+            return throttleResponse();
+        }
+        return ok(request.getId(), request.getTuples().size());
+    }
+
+    private Map<String, String> ok(String id, int size) {
+        return null;
+    }
+
+    private Map<String, String> throttleResponse() {
+        Map<String, String> map = new HashMap<>();
+        return map;
+    }
+
+    private Map<String, String> badEmitter(EmitKey emitKey) {
+        return null;
+    }
+
+    private Map<String, String> badFetcher(FetchKey fetchKey) {
+        return null;
+    }
+
+    private AsyncRequest deserializeASyncRequest(InputStream is) {
+        return new AsyncRequest("", Collections.EMPTY_LIST);
+    }
+
+}
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
index 8d25d83..16699ed 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
@@ -64,7 +64,7 @@ public class TikaServerIntegrationTest extends 
IntegrationTestBase {
             public void run() {
                 TikaServerCli.main(
                         new String[]{
-                                "-maxFiles", "2000",
+                                "-maxFiles", "100",
                                 "-p", INTEGRATION_TEST_PORT,
                                 "-tmpFilePrefix", "basic-"
                         });
@@ -72,9 +72,12 @@ public class TikaServerIntegrationTest extends 
IntegrationTestBase {
         };
         serverThread.start();
         try {
-            testBaseline();
+            for (int i = 0; i < 500; i++) {
+                System.out.println("base "+i);
+                testBaseline();
+            }
         } finally {
-            serverThread.interrupt();
+            //serverThread.interrupt();
         }
     }
 
@@ -563,6 +566,7 @@ public class TikaServerIntegrationTest extends 
IntegrationTestBase {
                 .put(ClassLoader
                         .getSystemResourceAsStream(TEST_HELLO_WORLD));
         Reader reader = new InputStreamReader((InputStream) 
response.getEntity(), UTF_8);
+        System.out.println(response.getStatus());
         List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
         assertEquals(1, metadataList.size());
         assertEquals("Nikolai Lobachevsky", metadataList.get(0).get("author"));

Reply via email to