This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-3304 by this push:
     new 025eded  WIP -- do not merge...still a bunch to do
025eded is described below

commit 025eded4def96c9dfdd2923386bb38566b8eac26
Author: tballison <[email protected]>
AuthorDate: Fri Mar 5 17:48:39 2021 -0500

    WIP -- do not merge...still a bunch to do
---
 .../apache/tika/pipes/emitter/AbstractEmitter.java |   2 +-
 .../org/apache/tika/pipes/emitter/Emitter.java     |   2 +-
 .../apache/tika/pipes/emitter/EmptyEmitter.java    |   2 +-
 .../tika/pipes/emitter/solr/SolrEmitter.java       |   2 +-
 .../src/test/resources/tika-config-s3.xml          |   1 +
 tika-pipes/tika-pipes-app/pom.xml                  |  21 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java |   6 +-
 .../org/apache/tika/pipes/async/AsyncEmitter.java  |  11 +-
 .../tika/pipes/async/AsyncEmitterProcess.java      | 156 +++++++++++++++
 .../apache/tika/pipes/async/AsyncProcessor.java    | 222 ++++++++++++++-------
 .../tika/pipes/async/AsyncRuntimeException.java    |  27 +--
 .../org/apache/tika/pipes/async/AsyncTask.java     |   5 +-
 .../org/apache/tika/pipes/async/AsyncWorker.java   |  43 ++--
 .../tika/pipes/async/AsyncWorkerProcess.java       | 122 +++++++----
 .../tika/pipes/{driver => async}/AsyncCliTest.java |   3 +-
 .../tika/pipes/async/AsyncProcessorTest.java       |  88 ++++++++
 .../org/apache/tika/pipes/async/MockEmitter.java   |  76 +++++++
 .../org/apache/tika/pipes/async/MockFetcher.java   |  29 +++
 .../pipes/{driver => async}/TestPipesDriver.java   |  11 +-
 .../apache/tika/pipes/PipeIntegrationTests.java    |  20 +-
 .../serialization/JsonFetchEmitTupleTest.java      |   6 +-
 .../core/TikaServerEmitterIntegrationTest.java     |   1 +
 22 files changed, 676 insertions(+), 180 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index a54b708..3c340ce 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -46,7 +46,7 @@ public abstract class AbstractEmitter implements Emitter {
      * @throws TikaEmitterException
      */
     @Override
-    public void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException {
+    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
         for (EmitData d : emitData) {
             emit(d.getEmitKey().getKey(), d.getMetadataList());
         }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index dbea669..8f88985 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -27,7 +27,7 @@ public interface Emitter {
 
     void emit(String emitKey, List<Metadata> metadataList) throws IOException, 
TikaEmitterException;
 
-    void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException;
+    void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException;
     //TODO -- add this later for xhtml?
     //void emit(String txt, Metadata metadata) throws IOException, 
TikaException;
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 5f83db7..dfded00 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -34,7 +34,7 @@ public class EmptyEmitter implements Emitter {
     }
 
     @Override
-    public void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException {
+    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
 
     }
 }
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 0e09e9c..f299913 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -104,7 +104,7 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
     }
 
     @Override
-    public void emit(List<EmitData> batch) throws IOException,
+    public void emit(List<? extends EmitData> batch) throws IOException,
             TikaEmitterException {
         if (batch == null || batch.size() == 0) {
             LOG.warn("batch is null or empty");
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
index 740058a..176513b 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
@@ -22,6 +22,7 @@
                 <param name="name" type="string">s3</param>
                 <param name="region" type="string">us-east-1</param>
                 <param name="profile" type="string">my_profile</param>
+                <param name="credentialsProvider" type="string">profile</param>
             </params>
         </fetcher>
     </fetchers>
diff --git a/tika-pipes/tika-pipes-app/pom.xml 
b/tika-pipes/tika-pipes-app/pom.xml
index 1042d42..9e26eb2 100644
--- a/tika-pipes/tika-pipes-app/pom.xml
+++ b/tika-pipes/tika-pipes-app/pom.xml
@@ -61,6 +61,16 @@
             <version>${h2.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>net.jpountz.lz4</groupId>
+            <artifactId>lz4</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>tika-emitter-fs</artifactId>
             <version>${project.version}</version>
@@ -68,15 +78,24 @@
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
-            <artifactId>tika-parsers-classic</artifactId>
+            <artifactId>tika-core</artifactId>
             <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <!--
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-parsers-classic-package</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>-->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index 2804a7a..cdf8855 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -60,7 +60,7 @@ public class AsyncCli {
             if (fetchIterator instanceof EmptyFetchIterator) {
                 throw new IllegalArgumentException("can't have empty fetch 
iterator");
             }
-            ArrayBlockingQueue<FetchEmitTuple> q = 
fetchIterator.init(maxConsumers);
+            ArrayBlockingQueue<FetchEmitTuple> q = new 
ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
             AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
             executorCompletionService.submit(fetchIterator);
             executorCompletionService.submit(enqueuer);
@@ -169,7 +169,7 @@ public class AsyncCli {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
-            insert.setByte(1, (byte) 
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+            insert.setByte(1, (byte) 
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
             insert.setInt(2, workerId);
             insert.setShort(3, (short) 0);
             insert.setString(4, JsonFetchEmitTuple.toJson(t));
@@ -220,7 +220,7 @@ public class AsyncCli {
             reallocate = connection.prepareStatement(sql);
 
             sql = "select count(1) from parse_queue where status="
-                    + AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal();
+                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
             sql = "insert into workers_shutdown (worker_id) values (?)";
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 22ede76..bdc6cd3 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -1,4 +1,4 @@
-package org.apache.tika.pipes.async;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,6 +14,8 @@ package org.apache.tika.pipes.async;/*
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.tika.pipes.async;
+
 
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
@@ -37,10 +39,9 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-/**
- * Worker thread that takes EmitData off the queue, batches it
- * and tries to emit it as a batch
- */
+//TODO -- convert this into a watcher over the AsyncEmitterProcess
+//along the lines of AsyncWorker
+//then add emitters to the main loop in async processor
 public class AsyncEmitter implements Callable<Integer> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
new file mode 100644
index 0000000..58854cb
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AsyncEmitterProcess {
+
+    private long emitWithinMs = 1000;
+    private long emitMaxBytes = 10_000_000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitterProcess.class);
+
+    private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static void main(String[] args) throws Exception {
+        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+        TikaConfig tikaConfig = new 
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+        int workerId = Integer.parseInt(args[0]);
+        LOG.debug("trying to get connection {} >{}<", workerId, db);
+        try (Connection connection = DriverManager.getConnection(db)) {
+            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+            asyncEmitter.execute(connection, workerId, tikaConfig);
+        }
+        System.exit(0);
+    }
+
+    private void execute(Connection connection, int workerId,
+                         TikaConfig tikaConfig) throws SQLException {
+        int recordsPerPulse = 10;
+        String sql = "update emits set status=" +
+                AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal()+
+                ", worker_id="+workerId+
+                " where emit_id in " +
+                " (select emit_id from emits "+//where worker_id = " + 
workerId +
+                " and status="+ 
AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
+                " order by time_stamp asc limit "+recordsPerPulse+" for 
update)";
+        PreparedStatement markForSelecting = connection.prepareStatement(sql);
+        sql = "select emit_id, uncompressed_size, bytes from emits where 
status=" +
+                AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal() +
+                " and worker_id=" + workerId +
+                " order by time_stamp asc";
+        PreparedStatement selectForProcessing = 
connection.prepareStatement(sql);
+        sql = "delete from emits where emit_id=?";
+        PreparedStatement deleteFromEmits = connection.prepareStatement(sql);
+
+    }
+
+    private AsyncData deserialize(byte[] compressed, int decompressedLength)
+            throws IOException {
+        byte[] restored = new byte[decompressedLength];
+        int compressedLength2 = decompressor.decompress(compressed, 0, 
restored,
+                0, decompressedLength);
+
+        return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+    }
+
+    private static class EmitDataCache {
+        private final EmitterManager emitterManager;
+        private final long maxBytes;
+
+        long estimatedSize = 0;
+        int size = 0;
+        Map<String, List<AsyncData>> map = new HashMap<>();
+
+        public EmitDataCache(EmitterManager emitterManager,
+                             long maxBytes) {
+            this.emitterManager = emitterManager;
+            this.maxBytes = maxBytes;
+        }
+
+        void updateEstimatedSize(long newBytes) {
+            estimatedSize += newBytes;
+        }
+
+        void add(AsyncData data) {
+
+            size++;
+            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), 
data.getMetadataList());
+            if (estimatedSize + sz > maxBytes) {
+                LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
+                        (estimatedSize+sz), maxBytes);
+                emitAll();
+            }
+            List<AsyncData> cached = 
map.get(data.getEmitKey().getEmitterName());
+            if (cached == null) {
+                cached = new ArrayList<>();
+                map.put(data.getEmitKey().getEmitterName(), cached);
+            }
+            updateEstimatedSize(sz);
+            cached.add(data);
+        }
+
+        private void emitAll() {
+            int emitted = 0;
+            LOG.debug("about to emit {}", size);
+            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
+                Emitter emitter = emitterManager.getEmitter(e.getKey());
+                tryToEmit(emitter, e.getValue());
+                emitted += e.getValue().size();
+            }
+            LOG.debug("emitted: {}", emitted);
+            estimatedSize = 0;
+            size = 0;
+            map.clear();
+            //lastEmitted = Instant.now();
+        }
+
+        private long tryToEmit(Emitter emitter, List<AsyncData> 
cachedEmitData) {
+
+            try {
+                emitter.emit(cachedEmitData);
+            } catch (IOException | TikaEmitterException e) {
+                LOG.warn("emitter class ({}): {}", emitter.getClass(),
+                        ExceptionUtils.getStackTrace(e));
+            }
+            return 1;
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index cfe7455..e7dc0c1 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -1,109 +1,142 @@
 package org.apache.tika.pipes.async;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-public class AsyncProcessor implements Closeable, Callable<Integer> {
+public class AsyncProcessor implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncProcessor.class);
-
+    protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
+    protected static String TIKA_ASYNC_CONFIG_FILE_KEY = 
"TIKA_ASYNC_CONFIG_FILE_KEY";
     private final Path tikaConfigPath;
     private AsyncConfig asyncConfig;
-    private final ArrayBlockingQueue<FetchEmitTuple> queue;//tmp directory 
used if no jdbc string is configured
+    private final ArrayBlockingQueue<FetchEmitTuple> queue;
+    private final Connection connection;
+    private int finishedThreads = 0;
+    private final int totalThreads;
+    private ExecutorService executorService;
+    private ExecutorCompletionService<Integer> executorCompletionService;
+
+    public static AsyncProcessor build(Path tikaConfigPath) throws 
AsyncRuntimeException {
+        try {
+            AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+
+            processor.init();
+            return processor;
+        } catch (SQLException|IOException e) {
+            throw new AsyncRuntimeException(e);
+        }
+    }
 
-    public AsyncProcessor (Path tikaConfigPath) {
+    private AsyncProcessor(Path tikaConfigPath) throws SQLException, 
IOException {
         this.tikaConfigPath = tikaConfigPath;
-        this.queue = new 
ArrayBlockingQueue<FetchEmitTuple>(asyncConfig.getQueueSize());
-
+        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
+        this.connection = 
DriverManager.getConnection(asyncConfig.getJdbcString());
+        this.totalThreads = asyncConfig.getMaxConsumers() + 2;
     }
 
-    public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, 
long offerMs) {
+    public synchronized boolean offer (
+            List<FetchEmitTuple> fetchEmitTuples, long offerMs) throws
+            AsyncRuntimeException, InterruptedException {
         if (queue == null) {
             throw new IllegalStateException("queue hasn't been initialized 
yet.");
         }
         long start = System.currentTimeMillis();
-        long elapsed = System.currentTimeMillis()-start;
+        long elapsed = System.currentTimeMillis() - start;
         while (elapsed < offerMs) {
+            checkActive();
             if (queue.remainingCapacity() > fetchEmitTuples.size()) {
-                queue.addAll(fetchEmitTuples);
-                return true;
+                try {
+                    queue.addAll(fetchEmitTuples);
+                    return true;
+                } catch (IllegalStateException e) {
+                    //swallow
+                }
             }
+            Thread.sleep(100);
             elapsed = System.currentTimeMillis() - start;
         }
         return false;
     }
 
-    public synchronized boolean offer(FetchEmitTuple t, long offerMs) throws 
InterruptedException {
+    public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+            throws AsyncRuntimeException, InterruptedException {
+        checkActive();
         return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * This polls the executorcompletionservice to check for execution 
exceptions
+     * and to make sure that some threads are still active.
+     *
+     * @return
+     * @throws AsyncRuntimeException
+     * @throws InterruptedException
+     */
+    public synchronized boolean checkActive()
+            throws AsyncRuntimeException, InterruptedException {
+        Future<Integer> future = executorCompletionService.poll();
+        if (future != null) {
+            try {
+                future.get();
+            } catch (ExecutionException e) {
+                throw new AsyncRuntimeException(e);
+            }
+            finishedThreads++;
+        }
+        if (finishedThreads == totalThreads) {
+            return false;
+        }
+        return true;
+    }
 
-    @Override
-    public Integer call() throws Exception {
-        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+    private void init() throws SQLException {
 
         setupTables();
 
-        ExecutorService executorService = Executors.newFixedThreadPool(
-                asyncConfig.getMaxConsumers() + 2);
-        ExecutorCompletionService<Integer> executorCompletionService =
+        executorService = Executors.newFixedThreadPool(
+                totalThreads);
+        executorCompletionService =
                 new ExecutorCompletionService<>(executorService);
 
-        try (Connection connection = 
DriverManager.getConnection(asyncConfig.getJdbcString())) {
-            AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, 
connection);
-
-            executorCompletionService.submit(enqueuer);
-            executorCompletionService.submit(new AssignmentManager(connection, 
enqueuer));
+        AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
 
-            for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
-                executorCompletionService.submit(new AsyncWorker(connection,
-                        asyncConfig.getJdbcString(), i, tikaConfigPath));
-            }
-            int completed = 0;
-            while (completed < asyncConfig.getMaxConsumers()+2) {
-                Future<Integer> future = executorCompletionService.take();
-                if (future != null) {
-                    int val = future.get();
-                    completed++;
-                    LOG.debug("finished " + val);
-                }
-            }
-        } finally {
-            executorService.shutdownNow();
+        executorCompletionService.submit(enqueuer);
+        executorCompletionService.submit(new AssignmentManager(connection, 
enqueuer));
+        //executorCompletionService.submit(new )
+        for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
+            executorCompletionService.submit(new AsyncWorker(connection,
+                    asyncConfig.getJdbcString(), i, tikaConfigPath));
         }
-        return 1;
     }
 
     private void setupTables() throws SQLException {
-        Connection connection = 
DriverManager.getConnection(asyncConfig.getJdbcString());
 
         String sql = "create table parse_queue " +
                 "(id bigint auto_increment primary key," +
@@ -112,31 +145,81 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
                 "retry smallint," + //short
                 "time_stamp timestamp," +
                 "json varchar(64000))";
-        connection.createStatement().execute(sql);
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
         //no clear benefit to creating an index on timestamp
 //        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status 
(time_stamp)";
-        sql = "create table workers (worker_id int primary key)";
-        connection.createStatement().execute(sql);
-
-        sql = "create table workers_shutdown (worker_id int primary key)";
-        connection.createStatement().execute(sql);
+        sql = "create table workers (worker_id int primary key, status 
tinyint)";
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
 
         sql = "create table error_log (task_id bigint, " +
                 "fetch_key varchar(10000)," +
                 "time_stamp timestamp," +
                 "retry integer," +
                 "error_code tinyint)";
-        connection.createStatement().execute(sql);
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
 
+        sql = "create table emits (" +
+                "emit_id bigint auto_increment primary key, " +
+                "status tinyint, " +
+                "worker_id integer, " +
+                "time_stamp timestamp, " +
+                "uncompressed_size bigint, " +
+                "bytes blob)";
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
     }
 
+    public void shutdownNow() throws IOException, AsyncRuntimeException {
+        try {
+            executorService.shutdownNow();
+        } finally {
+            //close down processes and db
+            if (asyncConfig.getTempDBDir() != null) {
+                FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+            }
+        }
+    }
+
+    /**
+     * This is a blocking close.  If you need to shutdown immediately,
+     * try {@link #shutdownNow()}.
+     * @throws IOException
+     */
     @Override
     public void close() throws IOException {
-        //close down processes and db
-
-
-        if (asyncConfig.getTempDBDir() != null) {
-            FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+        try {
+            for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
+                try {
+                    //blocking
+                    queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+                } catch (InterruptedException e) {
+                    //swallow
+                }
+            }
+            long start = System.currentTimeMillis();
+            long elapsed = System.currentTimeMillis() - start;
+            try {
+                boolean isActive = checkActive();
+                while (isActive) {
+                    isActive = checkActive();
+                    elapsed = System.currentTimeMillis();
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        } finally {
+            executorService.shutdownNow();
+            //close down processes and db
+            if (asyncConfig.getTempDBDir() != null) {
+                FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+            }
         }
     }
 
@@ -166,7 +249,7 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
             List<Integer> workers = new ArrayList<>();
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
-                LOG.debug("enqueing to db "+t);
+                LOG.debug("enqueing to db " + t);
                 if (t == null) {
                     //log.trace?
                 } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
@@ -179,7 +262,7 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
                     while (workers.size() == 0 && elapsed < 600000) {
                         workers = getActiveWorkers(connection);
                         Thread.sleep(100);
-                        elapsed = System.currentTimeMillis()-start;
+                        elapsed = System.currentTimeMillis() - start;
                     }
                     insert(t, workers);
                 }
@@ -189,11 +272,12 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
         boolean isComplete() {
             return isComplete;
         }
+
         private void insert(FetchEmitTuple t, List<Integer> workers) throws 
IOException, SQLException {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
-            insert.setByte(1, (byte) 
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+            insert.setByte(1, (byte) 
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
             insert.setInt(2, workerId);
             insert.setShort(3, (short) 0);
             insert.setString(4, JsonFetchEmitTuple.toJson(t));
@@ -210,7 +294,7 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
         private final PreparedStatement allocateNonworkersToWorkers;
         private final PreparedStatement reallocate;
         private final PreparedStatement countAvailableTasks;
-        private final PreparedStatement insertWorkersShutdown;
+        private final PreparedStatement shutdownWorker;
         private final Random random = new Random();
 
 
@@ -244,11 +328,13 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
             reallocate = connection.prepareStatement(sql);
 
             sql = "select count(1) from parse_queue where status="
-                    + AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal();
+                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
-            sql = "insert into workers_shutdown (worker_id) values (?)";
-            insertWorkersShutdown = connection.prepareStatement(sql);
+            sql = "update workers set status="+
+                    
AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
+                " where worker_id = ?";
+            shutdownWorker = connection.prepareStatement(sql);
         }
 
         @Override
@@ -262,20 +348,20 @@ public class AsyncProcessor implements Closeable, 
Callable<Integer> {
                     notifyWorkers();
                     return 1;
                 }
-                Thread.sleep(100);
+                Thread.sleep(200);
             }
         }
 
         private void notifyWorkers() throws SQLException {
             for (int workerId : getActiveWorkers(connection)) {
-                insertWorkersShutdown.clearParameters();
-                insertWorkersShutdown.setInt(1, workerId);
-                insertWorkersShutdown.execute();
+                shutdownWorker.clearParameters();
+                shutdownWorker.setInt(1, workerId);
+                shutdownWorker.execute();
             }
         }
 
         private boolean isComplete() throws SQLException {
-            if (! enqueuer.isComplete) {
+            if (!enqueuer.isComplete) {
                 return false;
             }
             try (ResultSet rs = countAvailableTasks.executeQuery()) {
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 62%
copy from 
tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
copy to 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
index 5f83db7..875cc90 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
@@ -14,27 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.emitter;
+package org.apache.tika.pipes.async;
 
-import org.apache.tika.metadata.Metadata;
-
-import java.io.IOException;
-import java.util.List;
-
-public class EmptyEmitter implements Emitter {
-
-    @Override
-    public String getName() {
-        return "empty";
-    }
-
-    @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
-
-    }
-
-    @Override
-    public void emit(List<EmitData> emitData) throws IOException, 
TikaEmitterException {
+/**
+ * Fatal exception that means that something went seriously wrong.
+ */
+public class AsyncRuntimeException extends RuntimeException {
 
+    public AsyncRuntimeException(Throwable t) {
+        super(t);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index 2770abb..eaac09d 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -8,7 +8,7 @@ public class AsyncTask extends FetchEmitTuple {
     public static final AsyncTask SHUTDOWN_SEMAPHORE
             = new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null, 
null));
 
-    private final long taskId;
+    private long taskId;
     private final short retry;
 
     public AsyncTask(long taskId, short retry,
@@ -26,6 +26,9 @@ public class AsyncTask extends FetchEmitTuple {
         return retry;
     }
 
+    public void setTaskId(long taskId) {
+        this.taskId = taskId;
+    }
     @Override
     public String toString() {
         return "AsyncTask{" +
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index 4f51e7a..ff4e5d4 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -10,7 +10,6 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.nio.file.Path;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -19,6 +18,9 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+
 /**
  * This controls monitoring of the AsyncWorkerProcess
  * and updates to the db on crashes etc.
@@ -32,7 +34,8 @@ public class AsyncWorker implements Callable<Integer> {
     private final int workerId;
     private final Path tikaConfigPath;
     private final Connection connection;
-    private final PreparedStatement delete;
+    private final PreparedStatement finished;
+    private final PreparedStatement restarting;
     private final PreparedStatement selectActiveTasks;
     private final PreparedStatement insertErrorLog;
     private final PreparedStatement resetStatus;
@@ -44,13 +47,19 @@ public class AsyncWorker implements Callable<Integer> {
         this.workerId = workerId;
         this.tikaConfigPath = tikaConfigPath;
         this.connection = connection;
-        String sql = "delete from workers where worker_id = (" + workerId + 
")";
-        delete = connection.prepareStatement(sql);
-
+        String sql = "update workers set status="+
+                AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+                " where worker_id = (" + workerId + ")";
+        finished = connection.prepareStatement(sql);
+
+        sql = "update workers set status="+
+                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
+                " where worker_id = (" + workerId + ")";
+        restarting = connection.prepareStatement(sql);
         //this checks if the process was able to reset the status
         sql = "select id, retry, json from parse_queue where worker_id="
                 + workerId +
-                " and status=" + 
AsyncWorkerProcess.STATUS_CODES.IN_PROCESS.ordinal();
+                " and status=" + 
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
         selectActiveTasks = connection.prepareStatement(sql);
 
         //if not, this is called to insert into the error log
@@ -66,7 +75,7 @@ public class AsyncWorker implements Callable<Integer> {
         try {
             p = start();
             int restarts = 0;
-            while (restarts++ < 2) {
+            while (true) {
                 boolean finished = p.waitFor(60, TimeUnit.SECONDS);
                 if (finished) {
                     int exitValue = p.exitValue();
@@ -74,7 +83,7 @@ public class AsyncWorker implements Callable<Integer> {
                         LOG.info("child process finished with exitValue=0");
                         return 1;
                     }
-                    reportCrash(exitValue);
+                    reportCrash(++restarts, exitValue);
                     p = start();
                 }
             }
@@ -82,9 +91,8 @@ public class AsyncWorker implements Callable<Integer> {
             if (p != null) {
                 p.destroyForcibly();
             }
-            delete.execute();
+            finished.execute();
         }
-        return -1 * workerId;
     }
 
     private Process start() throws IOException {
@@ -92,17 +100,20 @@ public class AsyncWorker implements Callable<Integer> {
                 "java", "-Djava.awt.headless=true",
                 "-cp", System.getProperty("java.class.path"),
                 "org.apache.tika.pipes.async.AsyncWorkerProcess",
-                connectionString, Integer.toString(workerId),
-                
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString())
+                Integer.toString(workerId)
         };
         ProcessBuilder pb = new ProcessBuilder(args);
+        pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
+        pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
+                tikaConfigPath.toAbsolutePath().toString());
         pb.inheritIO();
         return pb.start();
     }
-    private void reportCrash(int exitValue) throws SQLException, IOException {
+
+    private void reportCrash(int numRestarts, int exitValue) throws 
SQLException, IOException {
         LOG.warn("worker id={} terminated, exitValue={}",
                 workerId, exitValue);
-        delete.execute();
+        restarting.execute();
         List<AsyncTask> activeTasks = new ArrayList<>();
         try (ResultSet rs = selectActiveTasks.executeQuery()) {
             long taskId = rs.getLong(1);
@@ -120,7 +131,7 @@ public class AsyncWorker implements Callable<Integer> {
         }
 
         for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN,
+            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
                     insertErrorLog, resetStatus, LOG);
         }
 
@@ -142,7 +153,7 @@ public class AsyncWorker implements Callable<Integer> {
 
         try {
             resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) 
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+            resetStatus.setByte(1, (byte) 
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
             resetStatus.setShort(2, (short)(task.getRetry()+1));
             resetStatus.setLong(3, task.getTaskId());
             resetStatus.execute();
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 8273c5c..d01e9a0 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -1,10 +1,14 @@
 package org.apache.tika.pipes.async;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
@@ -18,10 +22,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -44,19 +51,37 @@ import static 
org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
 
 public class AsyncWorkerProcess {
 
-    enum STATUS_CODES {
+    enum TASK_STATUS_CODES {
         AVAILABLE,
         SELECTED,
         IN_PROCESS,
     }
 
+    public enum WORKER_STATUS_CODES {
+        ACTIVE,
+        RESTARTING,
+        HIBERNATING,
+        SHOULD_SHUTDOWN,
+        SHUTDOWN
+    }
+
+    public enum EMIT_STATUS_CODES {
+        READY,
+        EMITTING,
+        EMITTED
+    }
     enum ERROR_CODES {
         TIMEOUT,
         SECURITY_EXCEPTION,
         OTHER_EXCEPTION,
         OOM,
         OTHER_ERROR,
-        UNKNOWN
+        UNKNOWN_PARSE,
+        EMIT_SERIALIZATION,
+        EMIT_SQL_INSERT_EXCEPTION,
+        EMIT_SQL_SELECT_EXCEPTION,
+        EMIT_DESERIALIZATION,
+        EMIT_EXCEPTION
     }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncWorkerProcess.class);
@@ -64,14 +89,13 @@ public class AsyncWorkerProcess {
     //make these all configurable
     private static final long SHUTDOWN_AFTER_MS = 120000;
     private static long PULSE_MS = 1000;
-    private long emitWithinMs = 1000;
-    private long emitMaxBytes = 10_000_000;
     private long parseTimeoutMs = 60000;
 
     public static void main(String[] args) throws Exception {
-        String db = args[0];
-        int workerId = Integer.parseInt(args[1]);
-        TikaConfig tikaConfig = new TikaConfig(Paths.get(args[2]));
+        Path tikaConfigPath = 
Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
+        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+        TikaConfig tikaConfig = new TikaConfig(tikaConfigPath);
+        int workerId = Integer.parseInt(args[0]);
         LOG.debug("trying to get connection {} >{}<", workerId, db);
         try (Connection connection = DriverManager.getConnection(db)) {
             AsyncWorkerProcess asyncWorker = new AsyncWorkerProcess();
@@ -83,18 +107,12 @@ public class AsyncWorkerProcess {
     private void execute(Connection connection,
                          int workerId, TikaConfig tikaConfig) throws 
SQLException {
 
-        AsyncEmitHook asyncEmitHook = new AsyncPipesEmitHook(connection);
-        AsyncEmitter asyncEmitter = new 
AsyncEmitter(tikaConfig.getEmitterManager(),
-                asyncEmitHook,
-                emitWithinMs, emitMaxBytes);
-
         ExecutorService service = Executors.newFixedThreadPool(3);
         ExecutorCompletionService<Integer> executorCompletionService =
                 new ExecutorCompletionService<>(service);
 
-        executorCompletionService.submit(new Worker(connection, workerId, 
asyncEmitter,
+        executorCompletionService.submit(new Worker(connection, workerId,
                 tikaConfig, parseTimeoutMs));
-        executorCompletionService.submit(asyncEmitter);
         executorCompletionService.submit(new ForkWatcher(System.in));
 
         int completed = 0;
@@ -113,7 +131,6 @@ public class AsyncWorkerProcess {
             LOG.error("worker " + workerId + " had a mainloop exception", e);
         } finally {
             service.shutdownNow();
-            asyncEmitter.emitAll();
         }
         return;
     }
@@ -127,29 +144,30 @@ public class AsyncWorkerProcess {
         private final PreparedStatement markForProcessing;
         private final PreparedStatement checkForShutdown;
 
+
         TaskQueue(Connection connection, int workerId) throws SQLException {
             this.connection = connection;
             this.workerId = workerId;
-
             String sql = "update parse_queue set status=" +
-                    STATUS_CODES.SELECTED.ordinal()+
+                    TASK_STATUS_CODES.SELECTED.ordinal()+
                     " where id = " +
                     " (select id from parse_queue where worker_id = " + 
workerId +
-                    " and status="+STATUS_CODES.AVAILABLE.ordinal()+
+                    " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
                     " order by time_stamp asc limit 1 for update)";
             markForSelecting = connection.prepareStatement(sql);
             sql = "select id, retry, json from parse_queue where status=" +
-                    STATUS_CODES.SELECTED.ordinal() +
+                    TASK_STATUS_CODES.SELECTED.ordinal() +
                     " and " +
                     " worker_id=" + workerId +
                     " order by time_stamp asc limit 1";
             selectForProcessing = connection.prepareStatement(sql);
             sql = "update parse_queue set status="+
-                    STATUS_CODES.IN_PROCESS.ordinal()+
+                    TASK_STATUS_CODES.IN_PROCESS.ordinal()+
                     " where id=?";
             markForProcessing = connection.prepareStatement(sql);
 
-            sql = "select count(1) from workers_shutdown where worker_id=" + 
workerId;
+            sql = "select count(1) from workers where worker_id=" + workerId +
+            " and status="+WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
             checkForShutdown = connection.prepareStatement(sql);
         }
 
@@ -157,14 +175,12 @@ public class AsyncWorkerProcess {
             long start = System.currentTimeMillis();
             long elapsed = System.currentTimeMillis() - start;
             while (elapsed < pollMs) {
-
                 if (shouldShutdown()) {
                     return SHUTDOWN_SEMAPHORE;
                 }
-
                 int i = markForSelecting.executeUpdate();
                 if (i == 0) {
-                    //debugQueue();
+                    debugQueue();
                     Thread.sleep(PULSE_MS);
                 } else {
                     long taskId = -1;
@@ -200,7 +216,6 @@ public class AsyncWorkerProcess {
                     for (int i = 1; i <= rs.getMetaData().getColumnCount(); 
i++) {
                         System.out.print(rs.getString(i)+ " ");
                     }
-                    System.out.println("");
                 }
             }
         }
@@ -221,7 +236,6 @@ public class AsyncWorkerProcess {
 
         private final Connection connection;
         private final int workerId;
-        private final AsyncEmitter asyncEmitter;
         private final RecursiveParserWrapper parser;
         private final TikaConfig tikaConfig;
         private final long parseTimeoutMs;
@@ -229,24 +243,27 @@ public class AsyncWorkerProcess {
         private ExecutorCompletionService<AsyncData> executorCompletionService;
         private final PreparedStatement insertErrorLog;
         private final PreparedStatement resetStatus;
-
+        private final PreparedStatement insertEmitData;
+        private final ObjectMapper objectMapper = new ObjectMapper();
+        LZ4Factory factory = LZ4Factory.fastestInstance();
 
         public Worker(Connection connection,
                       int workerId,
-                      AsyncEmitter asyncEmitter,
                       TikaConfig tikaConfig, long parseTimeoutMs) throws 
SQLException {
             this.connection = connection;
             this.workerId = workerId;
-            this.asyncEmitter = asyncEmitter;
             this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
             this.tikaConfig = tikaConfig;
             this.executorService = Executors.newFixedThreadPool(1);
             this.executorCompletionService = new 
ExecutorCompletionService<>(executorService);
             this.parseTimeoutMs = parseTimeoutMs;
-            String sql = "insert into workers (worker_id) values (" + workerId 
+ ")";
+            String sql = "insert into workers (worker_id, status) " +
+                    "values (" + workerId + ", "+
+                    WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
             connection.createStatement().execute(sql);
             insertErrorLog = prepareInsertErrorLog(connection);
             resetStatus = prepareReset(connection);
+            insertEmitData = prepareInsertEmitData(connection);
         }
 
 
@@ -275,12 +292,10 @@ public class AsyncWorkerProcess {
                     }
                 }
             } catch (TimeoutException e) {
-                e.printStackTrace();
                 LOG.warn(task.getFetchKey().getKey(), e);
                 reportAndReset(task, ERROR_CODES.TIMEOUT,
                         insertErrorLog, resetStatus, LOG);
             } catch (SecurityException e) {
-                e.printStackTrace();
                 LOG.warn(task.getFetchKey().getKey(), e);
                 reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
                         insertErrorLog, resetStatus, LOG);
@@ -290,17 +305,14 @@ public class AsyncWorkerProcess {
                 reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
                         insertErrorLog, resetStatus, LOG);
             } catch (OutOfMemoryError e) {
-                e.printStackTrace();
                 LOG.warn(task.getFetchKey().getKey(), e);
                 reportAndReset(task, ERROR_CODES.OOM,
                         insertErrorLog, resetStatus, LOG);
             } catch (Error e) {
-                e.printStackTrace();
                 LOG.warn(task.getFetchKey().getKey(), e);
                 reportAndReset(task, ERROR_CODES.OTHER_ERROR,
                         insertErrorLog, resetStatus, LOG);
             } finally {
-                asyncEmitter.emitAll();
                 executorService.shutdownNow();
                 return 1;
             }
@@ -324,18 +336,37 @@ public class AsyncWorkerProcess {
                     }
                     boolean shouldEmit = checkForParseException(asyncData);
                     if (shouldEmit) {
-                        boolean offered = asyncEmitter.emit(asyncData, 
600000);//parameterize this
-                        if (!offered) {
-                            //TODO: deal with this
-                            LOG.warn("Failed to add ({}) " +
-                                            "to emit queue after 10 minutes.",
-                                    task.getFetchKey().getKey());
+                        try {
+                            emit(asyncData);
+                        } catch (JsonProcessingException e) {
+                            recordBadEmit(task.getTaskId(),
+                                    task.getFetchKey().getKey(),
+                                    ERROR_CODES.EMIT_SERIALIZATION.ordinal());
+                        } catch (SQLException e) {
+                            recordBadEmit(task.getTaskId(),
+                                    task.getFetchKey().getKey(),
+                                    
ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
                         }
                     }
                 }
             }
         }
 
+        private void recordBadEmit(long taskId, String key, int ordinal) {
+            //stub
+        }
+
+        private void emit(AsyncData asyncData) throws SQLException,
+                JsonProcessingException {
+            insertEmitData.clearParameters();
+            insertEmitData.setLong(1, asyncData.getAsyncTask().getTaskId());
+            byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
+            byte[] compressed = factory.fastCompressor().compress(bytes);
+            insertEmitData.setLong(2, bytes.length);
+            insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
+            insertEmitData.execute();
+        }
+
         private void handleTimeout(long taskId, String key) throws 
TimeoutException {
             LOG.warn("timeout taskid:{} fetchKey:{}", taskId, key);
             throw new TimeoutException(key);
@@ -371,6 +402,14 @@ public class AsyncWorkerProcess {
             }
             return shouldEmit;
         }
+
+        static PreparedStatement prepareInsertEmitData(Connection connection) 
throws SQLException {
+            return connection.prepareStatement(
+                    "insert into emits (status, time_stamp, uncompressed_size, 
bytes) " +
+                            " values 
("+AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
+                            ",CURRENT_TIMESTAMP(),?,?)"
+            );
+        }
     }
 
     private static class TaskProcessor implements Callable<AsyncData> {
@@ -404,7 +443,6 @@ public class AsyncWorkerProcess {
             } catch (SecurityException e) {
                 throw e;
             }
-
             injectUserMetadata(userMetadata, metadataList);
             EmitKey emitKey = task.getEmitKey();
             if (StringUtils.isBlank(emitKey.getKey())) {
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
similarity index 93%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
rename to 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
index a84c13a..c653267 100644
--- 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
+++ 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
@@ -1,6 +1,5 @@
-package org.apache.tika.pipes.driver;
+package org.apache.tika.pipes.async;
 
-import org.apache.tika.pipes.async.AsyncCli;
 import org.junit.Test;
 
 public class AsyncCliTest {
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
new file mode 100644
index 0000000..920634e
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -0,0 +1,88 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class AsyncProcessorTest {
+
+    private Path dbDir;
+    private Path dbFile;
+    private Connection connection;
+    private Path tikaConfigPath;
+
+    @Before
+    public void setUp() throws SQLException, IOException {
+        dbDir = Files.createTempDirectory("async-db");
+        dbFile = dbDir.resolve("emitted-db");
+        String jdbc = 
"jdbc:h2:file:"+dbFile.toAbsolutePath().toString()+";AUTO_SERVER=TRUE";
+        String sql = "create table emitted (id int auto_increment primary key, 
json varchar(20000))";
+
+        connection = DriverManager.getConnection(jdbc);
+        connection.createStatement().execute(sql);
+        tikaConfigPath = dbDir.resolve("tika-config.xml");
+        String xml = "" +
+                "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
+                "<properties>" +
+                "  <emitters>"+
+                "  <emitter 
class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
+                "    <params>\n" +
+                "      <param name=\"name\" type=\"string\">mock</param>\n"+
+                "      <param name=\"jdbc\" 
type=\"string\">"+jdbc+"</param>\n"+
+                "    </params>" +
+                "  </emitter>" +
+                "  </emitters>"+
+                "  <fetchers>" +
+                "    <fetcher 
class=\"org.apache.tika.pipes.async.MockFetcher\">" +
+                "      <param name=\"name\" type=\"string\">mock</param>\n"+
+                "    </fetcher>" +
+                "  </fetchers>"+
+                "</properties>";
+        Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @After
+    public void tearDown() throws SQLException, IOException {
+        connection.createStatement().execute("drop table emitted");
+        connection.close();
+        FileUtils.deleteDirectory(dbDir.toFile());
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+
+
+        AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
+        for (int i = 0 ; i < 100; i++) {
+            FetchEmitTuple t = new FetchEmitTuple(
+                    new FetchKey("mock", "key-"+i),
+                    new EmitKey("mock", "emit-"+i),
+                    new Metadata()
+            );
+            processor.offer(t, 1000);
+        }
+        processor.close();
+        String sql = "select * from emitted";
+        try (Statement st = connection.createStatement();
+             ResultSet rs = st.executeQuery(sql)) {
+            while (rs.next()) {
+                System.out.println(rs.getInt(1) + " : "+rs.getString(2));
+            }
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
new file mode 100644
index 0000000..9f07dec
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -0,0 +1,76 @@
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MockEmitter implements Initializable, Emitter {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private Connection connection;
+    private String jdbc;
+    private PreparedStatement insert;
+
+    @Field
+    public void setJdbc(String jdbc) {
+        this.jdbc = jdbc;
+    }
+
+    @Override
+    public String getName() {
+        return "mock";
+    }
+
+    @Override
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException, TikaEmitterException {
+        emit(Collections.singletonList(
+                new EmitData(
+                new EmitKey(getName(), emitKey), metadataList)));
+    }
+
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
+        for (EmitData d : emitData) {
+            String json = objectMapper.writeValueAsString(d);
+            try {
+                insert.clearParameters();
+                insert.setString(1, json);
+                insert.execute();
+            } catch (SQLException e) {
+                throw new TikaEmitterException("problem inserting", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
+        try {
+            connection = DriverManager.getConnection(jdbc);
+            String sql = "insert into emitted (json) values (?)";
+            insert = connection.prepareStatement(sql);
+        } catch (SQLException e) {
+            throw new TikaConfigException("problem w connection", e);
+        }
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
+
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
new file mode 100644
index 0000000..8c99ecf
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -0,0 +1,29 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class MockFetcher implements Fetcher {
+
+    private static byte[] BYTES = new String("<?xml version=\"1.0\" 
encoding=\"UTF-8\" ?>"+
+            "<mock>"+
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai 
Lobachevsky</metadata>"+
+            "<write element=\"p\">main_content</write>"+
+        "</mock>").getBytes(StandardCharsets.UTF_8);
+
+    @Override
+    public String getName() {
+        return "mock";
+    }
+
+    @Override
+    public InputStream fetch(String fetchKey, Metadata metadata) throws 
TikaException, IOException {
+        return new ByteArrayInputStream(BYTES);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
similarity index 91%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
rename to 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
index 2fb38df..394fb16 100644
--- 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
+++ 
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
@@ -1,4 +1,4 @@
-package org.apache.tika.pipes.driver;
+package org.apache.tika.pipes.async;
 
 
 import org.apache.commons.io.FileUtils;
@@ -8,20 +8,11 @@ import org.junit.Test;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 3831f07..58b4ff4 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -92,12 +92,17 @@ public class PipeIntegrationTests {
         int numConsumers = 1;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
-        completionService.submit(it);
+        ArrayBlockingQueue<FetchEmitTuple> queue = new 
ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new FSFetcherEmitter(
                     queue, tikaConfig.getFetcherManager().getFetcher("s3"), 
null));
         }
+        for (FetchEmitTuple t : it) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+        }
         int finished = 0;
         try {
             while (finished++ < numConsumers+1) {
@@ -112,17 +117,22 @@ public class PipeIntegrationTests {
     @Test
     public void testS3ToS3() throws Exception {
         TikaConfig tikaConfig = getConfig("tika-config-s3Tos3.xml");
-        FetchIterator it = tikaConfig.getFetchIterator();
         int numConsumers = 20;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
-        completionService.submit(it);
+        ArrayBlockingQueue<FetchEmitTuple> queue = new 
ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new S3FetcherEmitter(
                     queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
                     
(S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
         }
+        FetchIterator it = tikaConfig.getFetchIterator();
+        for (FetchEmitTuple t : it) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+        }
         int finished = 0;
         try {
             while (finished++ < numConsumers+1) {
diff --git 
a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
 
b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index 67bbbde..2c19afb 100644
--- 
a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++ 
b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -40,12 +40,12 @@ public class JsonFetchEmitTupleTest {
         m.add("m3", "v4");
 
         FetchEmitTuple t = new FetchEmitTuple(
-                new FetchKey("fetcher1", "fetchkey1"),
-                        new EmitKey("emitter1", "emitKey1"),
+                new FetchKey("my_fetcher", "fetchKey1"),
+                        new EmitKey("my_emitter", "emitKey1"),
                         m);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
-
+        System.out.println(writer.toString());
         Reader reader = new StringReader(writer.toString());
         FetchEmitTuple deserialized = JsonFetchEmitTuple.fromJson(reader);
         assertEquals(t, deserialized);
diff --git 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 913465d..9594de4 100644
--- 
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ 
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -264,6 +264,7 @@ public class TikaServerEmitterIntegrationTest extends 
IntegrationTestBase {
     private JsonNode testOne(String fileName, boolean shouldFileExist, 
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws Exception {
 
         awaitServerStartup();
+        System.out.println(getJsonString(fileName, onParseException));
         Response response = WebClient
                 .create(endPoint + "/emit")
                 .accept("application/json")

Reply via email to