This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-3304 by this push:
     new 26ff633  WIP -- do not merge...still a bunch to do
26ff633 is described below

commit 26ff633408fd6afe3995acbf6e6cca5b9592c7a1
Author: tballison <[email protected]>
AuthorDate: Fri Mar 12 17:24:08 2021 -0500

    WIP -- do not merge...still a bunch to do
---
 .../apache/tika/pipes/emitter/AbstractEmitter.java |   2 +-
 .../org/apache/tika/pipes/emitter/EmitKey.java     |  10 +-
 .../org/apache/tika/pipes/fetcher/FetchKey.java    |   9 +-
 tika-pipes/pom.xml                                 |   2 +-
 .../tika/pipes/emitter/solr/SolrEmitter.java       |   4 +-
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 167 -------------
 .../tika/pipes/async/AsyncEmitterProcess.java      | 156 ------------
 .../{tika-pipes-app => tika-pipes-async}/pom.xml   |   2 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java |  16 +-
 .../org/apache/tika/pipes/async/AsyncConfig.java   |   0
 .../org/apache/tika/pipes/async/AsyncData.java     |   0
 .../org/apache/tika/pipes/async/AsyncEmitHook.java |   0
 .../org/apache/tika/pipes/async/AsyncEmitter.java} |  55 ++--
 .../tika/pipes/async/AsyncEmitterProcess.java      | 276 +++++++++++++++++++++
 .../tika/pipes/async/AsyncPipesEmitHook.java       |   2 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    |  38 +--
 .../tika/pipes/async/AsyncRuntimeException.java    |   0
 .../org/apache/tika/pipes/async/AsyncTask.java     |   5 +-
 .../org/apache/tika/pipes/async/AsyncWorker.java   |   6 +-
 .../tika/pipes/async/AsyncWorkerProcess.java       |  51 ++--
 .../src/main/resources/log4j.properties            |   0
 .../org/apache/tika/pipes/async/AsyncCliTest.java  |   0
 .../tika/pipes/async/AsyncProcessorTest.java       |   0
 .../org/apache/tika/pipes/async/MockEmitter.java   |   0
 .../org/apache/tika/pipes/async/MockFetcher.java   |   0
 .../apache/tika/pipes/async/TestPipesDriver.java   |   0
 .../apache/tika/pipes/PipeIntegrationTests.java    |   2 +-
 tika-serialization/pom.xml                         |   5 +
 .../tika/metadata/serialization/JsonEmitData.java  |   2 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |   4 +-
 .../tika/metadata/serialization/JsonMetadata.java  |  14 +-
 .../tika/server/core/resource/AsyncEmitter.java    |   2 +-
 .../tika/server/core/resource/AsyncParser.java     |   2 +-
 .../tika/server/core/resource/EmitterResource.java |  13 +-
 34 files changed, 436 insertions(+), 409 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 3c340ce..39643b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -48,7 +48,7 @@ public abstract class AbstractEmitter implements Emitter {
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
         for (EmitData d : emitData) {
-            emit(d.getEmitKey().getKey(), d.getMetadataList());
+            emit(d.getEmitKey().getEmitKey(), d.getMetadataList());
         }
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java 
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index eb4c2c2..db5cf4e 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -18,9 +18,13 @@ package org.apache.tika.pipes.emitter;
 
 public class EmitKey {
 
-    private final String emitterName;
-    private final String emitKey;
+    private String emitterName;
+    private String emitKey;
 
+    //for serialization only...yuck.
+    public EmitKey() {
+
+    }
     public EmitKey(String emitterName, String emitKey) {
         this.emitterName = emitterName;
         this.emitKey = emitKey;
@@ -30,7 +34,7 @@ public class EmitKey {
         return emitterName;
     }
 
-    public String getKey() {
+    public String getEmitKey() {
         return emitKey;
     }
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java 
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index 6e86bcf..93b82b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -21,8 +21,13 @@ package org.apache.tika.pipes.fetcher;
  * to send to that fetcher to retrieve a specific file.
  */
 public class FetchKey {
-    private final String fetcherName;
-    private final String fetchKey;
+    private String fetcherName;
+    private String fetchKey;
+
+    //this is for serialization...yuck
+    public FetchKey(){
+
+    }
 
     public FetchKey(String fetcherName, String fetchKey) {
         this.fetcherName = fetcherName;
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index dec23a1..560ed2d 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -35,7 +35,7 @@
         <module>tika-fetchers</module>
         <module>tika-emitters</module>
         <module>tika-fetch-iterators</module>
-        <module>tika-pipes-app</module>
+        <module>tika-pipes-async</module>
         <module>tika-pipes-integration-tests</module>
     </modules>
 
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index f299913..7fb7f1b 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -34,12 +34,10 @@ import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
-import java.io.StringWriter;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -120,7 +118,7 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
         try (JsonGenerator jsonGenerator = new 
JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartArray();
             for (EmitData d : batch) {
-                jsonify(jsonGenerator, d.getEmitKey().getKey(), 
d.getMetadataList());
+                jsonify(jsonGenerator, d.getEmitKey().getEmitKey(), 
d.getMetadataList());
             }
             jsonGenerator.writeEndArray();
         }
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
deleted file mode 100644
index bdc6cd3..0000000
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-//TODO -- convert this into a watcher over the AsyncEmitterProcess
-//along the lines of AsyncWorker
-//then add emitters to the main loop in async processor
-public class AsyncEmitter implements Callable<Integer> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
-
-    private final EmitterManager emitterManager;
-    private final AsyncEmitHook asyncEmitHook;
-    private final long emitWithinMs;
-    private final long emitMaxBytes;
-    private final EmitDataCache cache;
-    ArrayBlockingQueue<AsyncData> dataQueue = new ArrayBlockingQueue<>(1000);
-
-    Instant lastEmitted = Instant.now();
-
-    public AsyncEmitter(EmitterManager emitterManager, AsyncEmitHook 
asyncEmitHook,
-                        long emitWithinMs, long emitMaxBytes) {
-        this.emitterManager = emitterManager;
-        this.asyncEmitHook = asyncEmitHook;
-        this.emitWithinMs = emitWithinMs;
-        this.emitMaxBytes = emitMaxBytes;
-        this.cache = new EmitDataCache();
-    }
-
-    public boolean emit(AsyncData asyncData, long pollMs)
-            throws InterruptedException {
-        if (asyncData == null) {
-            return true;
-        }
-        return dataQueue.offer(asyncData, pollMs, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        while (true) {
-            AsyncData asyncData = dataQueue.poll(100, TimeUnit.MILLISECONDS);
-            if (asyncData != null) {
-                cache.add(asyncData);
-            }
-            long elapsed = ChronoUnit.MILLIS.between(lastEmitted, 
Instant.now());
-            if (elapsed > emitWithinMs) {
-                LOG.debug("{} elapsed > {}, going to emitAll",
-                        elapsed, emitWithinMs);
-                //this can block for a bit
-                emitAll();
-            }
-        }
-    }
-
-    public void emitAll() {
-        cache.emitAll();
-    }
-
-    private class EmitDataCache {
-
-        long estimatedSize = 0;
-        int size = 0;
-        Map<String, List<AsyncData>> map = new HashMap<>();
-
-
-        void updateEstimatedSize(long newBytes) {
-            estimatedSize += newBytes;
-        }
-
-        synchronized void add(AsyncData data) {
-            size++;
-            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), 
data.getMetadataList());
-            if (estimatedSize + sz > emitMaxBytes) {
-                LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
-                        (estimatedSize + sz), emitMaxBytes);
-                emitAll();
-            }
-            List<AsyncData> cached = 
map.get(data.getEmitKey().getEmitterName());
-            if (cached == null) {
-                cached = new ArrayList<>();
-                map.put(data.getEmitKey().getEmitterName(), cached);
-            }
-            updateEstimatedSize(sz);
-            cached.add(data);
-        }
-
-        private synchronized void emitAll() {
-            int emitted = 0;
-            LOG.debug("about to emit all {}", size);
-            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
-                Emitter emitter = emitterManager.getEmitter(e.getKey());
-                tryToEmit(emitter, e.getKey(), e.getValue());
-                emitted += e.getValue().size();
-            }
-            LOG.debug("emitted: {}", emitted);
-            estimatedSize = 0;
-            size = 0;
-            map.clear();
-            lastEmitted = Instant.now();
-        }
-
-        private void tryToEmit(Emitter emitter, String emitterName, 
List<AsyncData> cachedEmitData) {
-            if (emitter == null) {
-                LOG.error("Can't find emitter '{}' in TikaConfig!", 
emitterName);
-            }
-            List<EmitData> emitData = new ArrayList<>();
-            Set<AsyncTask> asyncTasks = new HashSet<>();
-            for (AsyncData d : cachedEmitData) {
-                emitData.add(new EmitData(d.getAsyncTask().getEmitKey(), 
d.getMetadataList()));
-                asyncTasks.add(d.getAsyncTask());
-            }
-            try {
-                emitter.emit(emitData);
-                for (AsyncData d : cachedEmitData) {
-                    asyncEmitHook.onSuccess(d.getAsyncTask());
-                }
-            } catch (IOException | TikaEmitterException e) {
-                e.printStackTrace();
-                for (AsyncData d : cachedEmitData) {
-                    asyncEmitHook.onFail(d.getAsyncTask());
-                }
-                e.printStackTrace();
-                LOG.warn("emitter class ({}): {}", emitter.getClass(),
-                        ExceptionUtils.getStackTrace(e));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
 
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
deleted file mode 100644
index 58854cb..0000000
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class AsyncEmitterProcess {
-
-    private long emitWithinMs = 1000;
-    private long emitMaxBytes = 10_000_000;
-    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitterProcess.class);
-
-    private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    public static void main(String[] args) throws Exception {
-        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
-        TikaConfig tikaConfig = new 
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
-        int workerId = Integer.parseInt(args[0]);
-        LOG.debug("trying to get connection {} >{}<", workerId, db);
-        try (Connection connection = DriverManager.getConnection(db)) {
-            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
-            asyncEmitter.execute(connection, workerId, tikaConfig);
-        }
-        System.exit(0);
-    }
-
-    private void execute(Connection connection, int workerId,
-                         TikaConfig tikaConfig) throws SQLException {
-        int recordsPerPulse = 10;
-        String sql = "update emits set status=" +
-                AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal()+
-                ", worker_id="+workerId+
-                " where emit_id in " +
-                " (select emit_id from emits "+//where worker_id = " + 
workerId +
-                " and status="+ 
AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
-                " order by time_stamp asc limit "+recordsPerPulse+" for 
update)";
-        PreparedStatement markForSelecting = connection.prepareStatement(sql);
-        sql = "select emit_id, uncompressed_size, bytes from emits where 
status=" +
-                AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal() +
-                " and worker_id=" + workerId +
-                " order by time_stamp asc";
-        PreparedStatement selectForProcessing = 
connection.prepareStatement(sql);
-        sql = "delete from emits where emit_id=?";
-        PreparedStatement deleteFromEmits = connection.prepareStatement(sql);
-
-    }
-
-    private AsyncData deserialize(byte[] compressed, int decompressedLength)
-            throws IOException {
-        byte[] restored = new byte[decompressedLength];
-        int compressedLength2 = decompressor.decompress(compressed, 0, 
restored,
-                0, decompressedLength);
-
-        return objectMapper.readerFor(AsyncTask.class).readValue(restored);
-    }
-
-    private static class EmitDataCache {
-        private final EmitterManager emitterManager;
-        private final long maxBytes;
-
-        long estimatedSize = 0;
-        int size = 0;
-        Map<String, List<AsyncData>> map = new HashMap<>();
-
-        public EmitDataCache(EmitterManager emitterManager,
-                             long maxBytes) {
-            this.emitterManager = emitterManager;
-            this.maxBytes = maxBytes;
-        }
-
-        void updateEstimatedSize(long newBytes) {
-            estimatedSize += newBytes;
-        }
-
-        void add(AsyncData data) {
-
-            size++;
-            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), 
data.getMetadataList());
-            if (estimatedSize + sz > maxBytes) {
-                LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
-                        (estimatedSize+sz), maxBytes);
-                emitAll();
-            }
-            List<AsyncData> cached = 
map.get(data.getEmitKey().getEmitterName());
-            if (cached == null) {
-                cached = new ArrayList<>();
-                map.put(data.getEmitKey().getEmitterName(), cached);
-            }
-            updateEstimatedSize(sz);
-            cached.add(data);
-        }
-
-        private void emitAll() {
-            int emitted = 0;
-            LOG.debug("about to emit {}", size);
-            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
-                Emitter emitter = emitterManager.getEmitter(e.getKey());
-                tryToEmit(emitter, e.getValue());
-                emitted += e.getValue().size();
-            }
-            LOG.debug("emitted: {}", emitted);
-            estimatedSize = 0;
-            size = 0;
-            map.clear();
-            //lastEmitted = Instant.now();
-        }
-
-        private long tryToEmit(Emitter emitter, List<AsyncData> 
cachedEmitData) {
-
-            try {
-                emitter.emit(cachedEmitData);
-            } catch (IOException | TikaEmitterException e) {
-                LOG.warn("emitter class ({}): {}", emitter.getClass(),
-                        ExceptionUtils.getStackTrace(e));
-            }
-            return 1;
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-app/pom.xml 
b/tika-pipes/tika-pipes-async/pom.xml
similarity index 98%
rename from tika-pipes/tika-pipes-app/pom.xml
rename to tika-pipes/tika-pipes-async/pom.xml
index 9e26eb2..ded8a0a 100644
--- a/tika-pipes/tika-pipes-app/pom.xml
+++ b/tika-pipes/tika-pipes-async/pom.xml
@@ -28,7 +28,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>tika-pipes-app</artifactId>
+    <artifactId>tika-pipes-async</artifactId>
     <packaging>pom</packaging>
     <name>Apache Tika emitters</name>
     <url>http://tika.apache.org/</url>
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
similarity index 96%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index cdf8855..4860299 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -90,7 +90,7 @@ public class AsyncCli {
                 ";AUTO_SERVER=TRUE";
         Connection connection = DriverManager.getConnection(url);
 
-        String sql = "create table parse_queue " +
+        String sql = "create table task_queue " +
                 "(id bigint auto_increment primary key," +
                 "status tinyint," +//byte
                 "worker_id integer," +
@@ -132,7 +132,7 @@ public class AsyncCli {
                           Connection connection) throws SQLException {
             this.queue = queue;
             this.connection = connection;
-            String sql = "insert into parse_queue (status, time_stamp, 
worker_id, retry, json) " +
+            String sql = "insert into task_queue (status, time_stamp, 
worker_id, retry, json) " +
                     "values (?,CURRENT_TIMESTAMP(),?,?,?)";
             insert = connection.prepareStatement(sql);
         }
@@ -196,30 +196,30 @@ public class AsyncCli {
             //this gets workers and # of tasks in desc order of number of tasks
             String sql = "select w.worker_id, p.cnt " +
                     "from workers w " +
-                    "left join (select worker_id, count(1) as cnt from 
parse_queue " +
+                    "left join (select worker_id, count(1) as cnt from 
task_queue " +
                     "where status=0 group by worker_id)" +
                     " p on p.worker_id=w.worker_id order by p.cnt desc";
             getQueueDistribution = connection.prepareStatement(sql);
             //find workers that have assigned tasks but are not in the
             //workers table
-            sql = "select p.worker_id, count(1) as cnt from parse_queue p " +
+            sql = "select p.worker_id, count(1) as cnt from task_queue p " +
                     "left join workers w on p.worker_id=w.worker_id " +
                     "where w.worker_id is null group by p.worker_id";
             findMissingWorkers = connection.prepareStatement(sql);
 
-            sql = "update parse_queue set worker_id=? where worker_id=?";
+            sql = "update task_queue set worker_id=? where worker_id=?";
             allocateNonworkersToWorkers = connection.prepareStatement(sql);
 
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
-            sql = "update parse_queue set worker_id= ? where id in " +
-                    "(select id from parse_queue where " +
+            sql = "update task_queue set worker_id= ? where id in " +
+                    "(select id from task_queue where " +
                     "worker_id = ? and " +
                     "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from parse_queue where status="
+            sql = "select count(1) from task_queue where status="
                     + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
similarity index 77%
copy from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
copy to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index ff4e5d4..071a1fa 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -1,8 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
+
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ProcessUtils;
+import org.apache.tika.utils.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -13,21 +35,24 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
 import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
 
-/**
- * This controls monitoring of the AsyncWorkerProcess
- * and updates to the db on crashes etc.
- */
-public class AsyncWorker implements Callable<Integer> {
+public class AsyncEmitter implements Callable<Integer> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncWorker.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
 
 
     private final String connectionString;
@@ -40,7 +65,7 @@ public class AsyncWorker implements Callable<Integer> {
     private final PreparedStatement insertErrorLog;
     private final PreparedStatement resetStatus;
 
-    public AsyncWorker(Connection connection,
+    public AsyncEmitter(Connection connection,
                        String connectionString, int workerId,
                        Path tikaConfigPath) throws SQLException {
         this.connectionString = connectionString;
@@ -57,7 +82,7 @@ public class AsyncWorker implements Callable<Integer> {
                 " where worker_id = (" + workerId + ")";
         restarting = connection.prepareStatement(sql);
         //this checks if the process was able to reset the status
-        sql = "select id, retry, json from parse_queue where worker_id="
+        sql = "select id, retry, json from task_queue where worker_id="
                 + workerId +
                 " and status=" + 
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
         selectActiveTasks = connection.prepareStatement(sql);
@@ -80,7 +105,7 @@ public class AsyncWorker implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("child process finished with exitValue=0");
+                        LOG.info("forked emitter process finished with 
exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -99,7 +124,7 @@ public class AsyncWorker implements Callable<Integer> {
         String[] args = new String[]{
                 "java", "-Djava.awt.headless=true",
                 "-cp", System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncWorkerProcess",
+                "org.apache.tika.pipes.async.AsyncEmitterProcess",
                 Integer.toString(workerId)
         };
         ProcessBuilder pb = new ProcessBuilder(args);
@@ -138,8 +163,8 @@ public class AsyncWorker implements Callable<Integer> {
     }
 
     static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES 
errorCode,
-                             PreparedStatement insertErrorLog, 
PreparedStatement resetStatus,
-                             Logger logger) {
+                               PreparedStatement insertErrorLog, 
PreparedStatement resetStatus,
+                               Logger logger) {
         try {
             insertErrorLog.clearParameters();
             insertErrorLog.setLong(1, task.getTaskId());
@@ -166,14 +191,14 @@ public class AsyncWorker implements Callable<Integer> {
         //if not, this is called to insert into the error log
         return connection.prepareStatement(
                 "insert into error_log (task_id, fetch_key, time_stamp, retry, 
error_code) " +
-                " values (?,?,CURRENT_TIMESTAMP(),?,?)"
+                        " values (?,?,CURRENT_TIMESTAMP(),?,?)"
         );
     }
 
     static PreparedStatement prepareReset(Connection connection) throws 
SQLException {
         //and this is called to reset the status on error
         return connection.prepareStatement(
-                "update parse_queue set " +
+                "update task_queue set " +
                         "status=?, " +
                         "time_stamp=CURRENT_TIMESTAMP(), " +
                         "retry=? " +
diff --git 
a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
new file mode 100644
index 0000000..37c5f3a
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.commons.io.IOUtils;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AsyncEmitterProcess {
+
+    //TODO -- parameterize these
+    private long emitWithinMs = 10000;
+    private long emitMaxBytes = 10_000_000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitterProcess.class);
+
+    private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    int recordsPerPulse = 10;
+    private PreparedStatement markForSelecting;
+    private PreparedStatement selectForProcessing;
+    private PreparedStatement emitStatusUpdate;
+    private PreparedStatement checkForShutdown;
+
+    public static void main(String[] args) throws Exception {
+        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+        TikaConfig tikaConfig = new 
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+        int workerId = Integer.parseInt(args[0]);
+        LOG.debug("trying to get connection {} >{}<", workerId, db);
+        try (Connection connection = DriverManager.getConnection(db)) {
+            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+            asyncEmitter.execute(connection, workerId, tikaConfig);
+        }
+        System.exit(0);
+    }
+
+    private void execute(Connection connection, int workerId,
+                         TikaConfig tikaConfig) throws SQLException,
+            InterruptedException {
+        prepareStatements(connection, workerId);
+        EmitterManager emitterManager = tikaConfig.getEmitterManager();
+        EmitDataCache emitDataCache = new EmitDataCache(emitterManager, 
emitMaxBytes,
+                emitStatusUpdate);
+        int shouldShutdown = 0;
+        while (true) {
+            int toEmit = markForSelecting.executeUpdate();
+            if (toEmit > 0) {
+                try (ResultSet rs = selectForProcessing.executeQuery()) {
+                    while (rs.next()) {
+                        long id = rs.getLong(1);
+                        Timestamp ts = rs.getTimestamp(2);
+                        int uncompressedSize = rs.getInt(3);
+                        Blob blob = rs.getBlob(4);
+                        try {
+                            tryToEmit(id, ts, uncompressedSize, blob,
+                                    emitDataCache);
+                        } catch (SQLException|IOException e) {
+                            reportEmitStatus(
+                                    Collections.singletonList(id),
+                                    
AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+                                    emitStatusUpdate
+                            );
+                        }
+                    }
+                }
+            }
+            if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
+                emitDataCache.emitAll();
+            }
+            Thread.sleep(500);
+            if (shouldShutdown()) {
+                shouldShutdown++;
+            }
+            //make sure to test twice
+            if (shouldShutdown > 1) {
+                emitDataCache.emitAll();
+                return;
+            }
+        }
+    }
+
+    private void tryToEmit(long id, Timestamp ts,
+                           int decompressedLength,
+                           Blob blob, EmitDataCache emitDataCache)
+            throws SQLException, IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        IOUtils.copyLarge(blob.getBinaryStream(), bos);
+        AsyncData asyncData = deserialize(bos.toByteArray(), 
decompressedLength);
+        emitDataCache.add(asyncData);
+    }
+
+    boolean shouldShutdown() throws SQLException {
+        try (ResultSet rs = checkForShutdown.executeQuery()) {
+            if (rs.next()) {
+                int val = rs.getInt(1);
+                return val > 0;
+            }
+        }
+        return false;
+    }
+
+    private void prepareStatements(Connection connection, int workerId) throws 
SQLException {
+        String sql = "update task_queue set status=" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal()+
+                ", worker_id="+workerId+", time_stamp=CURRENT_TIMESTAMP()"+
+                " where id in " +
+                " (select id from task_queue "+//where worker_id = " + 
workerId +
+                " where status="+ 
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
+                " order by time_stamp asc limit "+recordsPerPulse+" for 
update)";
+        markForSelecting = connection.prepareStatement(sql);
+
+        sql = "select q.id, q.time_stamp, uncompressed_size, bytes from emits 
e " +
+                "join task_queue q " +
+                "where q.status=" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() +
+                " and worker_id=" + workerId +
+                " order by time_stamp asc";
+        selectForProcessing = connection.prepareStatement(sql);
+
+        sql = "update task_queue set status=?"+
+                ", time_stamp=CURRENT_TIMESTAMP()"+
+                " where id=?";
+        emitStatusUpdate = connection.prepareStatement(sql);
+
+        sql = "select count(1) from workers where worker_id=" + workerId +
+                " and status="+ 
AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+        checkForShutdown = connection.prepareStatement(sql);
+    }
+
+
+    private AsyncData deserialize(byte[] compressed, int decompressedLength)
+            throws IOException {
+        byte[] restored = new byte[decompressedLength];
+        int compressedLength2 = decompressor.decompress(compressed, 0, 
restored,
+                0, decompressedLength);
+
+        return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+    }
+
+    private static class EmitDataCache {
+        private final EmitterManager emitterManager;
+        private final long maxBytes;
+        private final PreparedStatement emitStatusUpdate;
+        private Instant lastAdded = Instant.now();
+
+        long estimatedSize = 0;
+        int size = 0;
+        Map<String, List<AsyncData>> map = new HashMap<>();
+
+        public EmitDataCache(EmitterManager emitterManager,
+                             long maxBytes, PreparedStatement 
emitStatusUpdate) {
+            this.emitterManager = emitterManager;
+            this.maxBytes = maxBytes;
+            this.emitStatusUpdate = emitStatusUpdate;
+        }
+
+        void updateEstimatedSize(long newBytes) {
+            estimatedSize += newBytes;
+        }
+
+        void add(AsyncData data) {
+
+            size++;
+            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(), 
data.getMetadataList());
+            if (estimatedSize + sz > maxBytes) {
+                LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
+                        (estimatedSize+sz), maxBytes);
+                emitAll();
+            }
+            List<AsyncData> cached = 
map.get(data.getEmitKey().getEmitterName());
+            if (cached == null) {
+                cached = new ArrayList<>();
+                map.put(data.getEmitKey().getEmitterName(), cached);
+            }
+            updateEstimatedSize(sz);
+            cached.add(data);
+            lastAdded = Instant.now();
+        }
+
+        private void emitAll() {
+            int emitted = 0;
+            LOG.debug("about to emit {}", size);
+            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
+                Emitter emitter = emitterManager.getEmitter(e.getKey());
+
+                try {
+                    tryToEmit(emitter, e.getValue());
+                } catch (SQLException ex) {
+                    ex.printStackTrace();
+                }
+                emitted += e.getValue().size();
+            }
+            LOG.debug("emitted: {}", emitted);
+            estimatedSize = 0;
+            size = 0;
+            map.clear();
+        }
+
+        private long tryToEmit(Emitter emitter, List<AsyncData> cachedEmitData)
+                throws SQLException {
+            List<Long> ids = new ArrayList<>();
+            for (AsyncData d : cachedEmitData) {
+                ids.add(d.getAsyncTask().getTaskId());
+            }
+            try {
+                emitter.emit(cachedEmitData);
+            } catch (IOException | TikaEmitterException e) {
+                LOG.warn("emitter class ({}): {}", emitter.getClass(),
+                        ExceptionUtils.getStackTrace(e));
+                reportEmitStatus(ids, 
AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+                        emitStatusUpdate);
+            }
+            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED,
+                    emitStatusUpdate);
+            return 1;
+        }
+
+
+        public boolean exceedsEmitWithin(long emitWithinMs) {
+            return ChronoUnit.MILLIS.between(lastAdded, Instant.now())
+                    > emitWithinMs;
+        }
+    }
+
+    private static void reportEmitStatus(List<Long> ids,
+                                         AsyncWorkerProcess.TASK_STATUS_CODES 
emitted,
+                                         PreparedStatement emitStatusUpdate)
+            throws SQLException {
+        for (long id : ids) {
+            emitStatusUpdate.clearParameters();
+            emitStatusUpdate.setByte(1, (byte)emitted.ordinal());
+            emitStatusUpdate.setLong(2, id);
+            emitStatusUpdate.executeUpdate();
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
similarity index 95%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
index f8665d7..3828b24 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -15,7 +15,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
     private final PreparedStatement markFailure;
 
     public AsyncPipesEmitHook(Connection connection) throws SQLException  {
-        String sql = "delete from parse_queue where id=?";
+        String sql = "delete from task_queue where id=?";
         markSuccess = connection.prepareStatement(sql);
         //TODO --fix this
         markFailure = connection.prepareStatement(sql);
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
similarity index 92%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index e7dc0c1..b4d8a8a 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -58,7 +58,7 @@ public class AsyncProcessor implements Closeable {
         this.asyncConfig = AsyncConfig.load(tikaConfigPath);
         this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
         this.connection = 
DriverManager.getConnection(asyncConfig.getJdbcString());
-        this.totalThreads = asyncConfig.getMaxConsumers() + 2;
+        this.totalThreads = asyncConfig.getMaxConsumers() + 2 + 1;
     }
 
     public synchronized boolean offer (
@@ -134,17 +134,20 @@ public class AsyncProcessor implements Closeable {
             executorCompletionService.submit(new AsyncWorker(connection,
                     asyncConfig.getJdbcString(), i, tikaConfigPath));
         }
+        executorCompletionService.submit(new AsyncEmitter(connection,
+                asyncConfig.getJdbcString(), asyncConfig.getMaxConsumers(),
+                tikaConfigPath));
     }
 
     private void setupTables() throws SQLException {
 
-        String sql = "create table parse_queue " +
+        String sql = "create table task_queue " +
                 "(id bigint auto_increment primary key," +
                 "status tinyint," +//byte
                 "worker_id integer," +
                 "retry smallint," + //short
                 "time_stamp timestamp," +
-                "json varchar(64000))";
+                "json varchar(64000))";//this is the AsyncTask ... not the 
emit data!
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
@@ -165,10 +168,8 @@ public class AsyncProcessor implements Closeable {
         }
 
         sql = "create table emits (" +
-                "emit_id bigint auto_increment primary key, " +
-                "status tinyint, " +
-                "worker_id integer, " +
-                "time_stamp timestamp, " +
+                "id bigint primary key, " +
+                "time_stamp timestamp, "+
                 "uncompressed_size bigint, " +
                 "bytes blob)";
         try (Statement st = connection.createStatement()) {
@@ -203,6 +204,15 @@ public class AsyncProcessor implements Closeable {
                     //swallow
                 }
             }
+            //TODO: clean this up
+            String sql = "update workers set status="+
+                    AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+                    " where worker_id = (" + asyncConfig.getMaxConsumers() + 
")";
+            try {
+                connection.prepareStatement(sql).execute();
+            } catch (SQLException throwables) {
+                throwables.printStackTrace();
+            }
             long start = System.currentTimeMillis();
             long elapsed = System.currentTimeMillis() - start;
             try {
@@ -239,7 +249,7 @@ public class AsyncProcessor implements Closeable {
                           Connection connection) throws SQLException {
             this.queue = queue;
             this.connection = connection;
-            String sql = "insert into parse_queue (status, time_stamp, 
worker_id, retry, json) " +
+            String sql = "insert into task_queue (status, time_stamp, 
worker_id, retry, json) " +
                     "values (?,CURRENT_TIMESTAMP(),?,?,?)";
             insert = connection.prepareStatement(sql);
         }
@@ -304,30 +314,30 @@ public class AsyncProcessor implements Closeable {
             //this gets workers and # of tasks in desc order of number of tasks
             String sql = "select w.worker_id, p.cnt " +
                     "from workers w " +
-                    "left join (select worker_id, count(1) as cnt from 
parse_queue " +
+                    "left join (select worker_id, count(1) as cnt from 
task_queue " +
                     "where status=0 group by worker_id)" +
                     " p on p.worker_id=w.worker_id order by p.cnt desc";
             getQueueDistribution = connection.prepareStatement(sql);
             //find workers that have assigned tasks but are not in the
             //workers table
-            sql = "select p.worker_id, count(1) as cnt from parse_queue p " +
+            sql = "select p.worker_id, count(1) as cnt from task_queue p " +
                     "left join workers w on p.worker_id=w.worker_id " +
                     "where w.worker_id is null group by p.worker_id";
             findMissingWorkers = connection.prepareStatement(sql);
 
-            sql = "update parse_queue set worker_id=? where worker_id=?";
+            sql = "update task_queue set worker_id=? where worker_id=?";
             allocateNonworkersToWorkers = connection.prepareStatement(sql);
 
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
-            sql = "update parse_queue set worker_id= ? where id in " +
-                    "(select id from parse_queue where " +
+            sql = "update task_queue set worker_id= ? where id in " +
+                    "(select id from task_queue where " +
                     "worker_id = ? and " +
                     "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from parse_queue where status="
+            sql = "select count(1) from task_queue where status="
                     + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
similarity index 79%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index eaac09d..24acdf8 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -1,5 +1,6 @@
 package org.apache.tika.pipes.async;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
@@ -11,8 +12,8 @@ public class AsyncTask extends FetchEmitTuple {
     private long taskId;
     private final short retry;
 
-    public AsyncTask(long taskId, short retry,
-                     FetchEmitTuple fetchEmitTuple) {
+    public AsyncTask(@JsonProperty("taskId") long taskId, 
@JsonProperty("retry") short retry,
+                     @JsonProperty("fetchEmitTuple") FetchEmitTuple 
fetchEmitTuple) {
         super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(), 
fetchEmitTuple.getMetadata());
         this.taskId = taskId;
         this.retry = retry;
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
similarity index 97%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index ff4e5d4..0b685a2 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -57,7 +57,7 @@ public class AsyncWorker implements Callable<Integer> {
                 " where worker_id = (" + workerId + ")";
         restarting = connection.prepareStatement(sql);
         //this checks if the process was able to reset the status
-        sql = "select id, retry, json from parse_queue where worker_id="
+        sql = "select id, retry, json from task_queue where worker_id="
                 + workerId +
                 " and status=" + 
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
         selectActiveTasks = connection.prepareStatement(sql);
@@ -80,7 +80,7 @@ public class AsyncWorker implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("child process finished with exitValue=0");
+                        LOG.info("forked worker process finished with 
exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -173,7 +173,7 @@ public class AsyncWorker implements Callable<Integer> {
     static PreparedStatement prepareReset(Connection connection) throws 
SQLException {
         //and this is called to reset the status on error
         return connection.prepareStatement(
-                "update parse_queue set " +
+                "update task_queue set " +
                         "status=?, " +
                         "time_stamp=CURRENT_TIMESTAMP(), " +
                         "retry=? " +
diff --git 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
similarity index 92%
rename from 
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
rename to 
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index d01e9a0..8675f22 100644
--- 
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ 
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -2,13 +2,14 @@ package org.apache.tika.pipes.async;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonMetadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
@@ -27,7 +28,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -55,6 +55,11 @@ public class AsyncWorkerProcess {
         AVAILABLE,
         SELECTED,
         IN_PROCESS,
+        AVAILABLE_EMIT,
+        SELECTED_EMIT,
+        IN_PROCESS_EMIT,
+        FAILED_EMIT,
+        EMITTED
     }
 
     public enum WORKER_STATUS_CODES {
@@ -65,11 +70,6 @@ public class AsyncWorkerProcess {
         SHUTDOWN
     }
 
-    public enum EMIT_STATUS_CODES {
-        READY,
-        EMITTING,
-        EMITTED
-    }
     enum ERROR_CODES {
         TIMEOUT,
         SECURITY_EXCEPTION,
@@ -148,21 +148,24 @@ public class AsyncWorkerProcess {
         TaskQueue(Connection connection, int workerId) throws SQLException {
             this.connection = connection;
             this.workerId = workerId;
-            String sql = "update parse_queue set status=" +
+            //TODO -- need to update timestamp
+            String sql = "update task_queue set status=" +
                     TASK_STATUS_CODES.SELECTED.ordinal()+
+                    ", time_stamp=CURRENT_TIMESTAMP()"+
                     " where id = " +
-                    " (select id from parse_queue where worker_id = " + 
workerId +
+                    " (select id from task_queue where worker_id = " + 
workerId +
                     " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
                     " order by time_stamp asc limit 1 for update)";
             markForSelecting = connection.prepareStatement(sql);
-            sql = "select id, retry, json from parse_queue where status=" +
+            sql = "select id, retry, json from task_queue where status=" +
                     TASK_STATUS_CODES.SELECTED.ordinal() +
                     " and " +
                     " worker_id=" + workerId +
                     " order by time_stamp asc limit 1";
             selectForProcessing = connection.prepareStatement(sql);
-            sql = "update parse_queue set status="+
+            sql = "update task_queue set status="+
                     TASK_STATUS_CODES.IN_PROCESS.ordinal()+
+                    ", time_stamp=CURRENT_TIMESTAMP()"+
                     " where id=?";
             markForProcessing = connection.prepareStatement(sql);
 
@@ -211,11 +214,12 @@ public class AsyncWorkerProcess {
 
         private void debugQueue() throws SQLException {
             try (ResultSet rs = connection.createStatement().executeQuery(
-                    "select * from parse_queue limit 10")) {
+                    "select * from task_queue limit 10")) {
                 while (rs.next()) {
                     for (int i = 1; i <= rs.getMetaData().getColumnCount(); 
i++) {
                         System.out.print(rs.getString(i)+ " ");
                     }
+                    System.out.println("");
                 }
             }
         }
@@ -244,6 +248,7 @@ public class AsyncWorkerProcess {
         private final PreparedStatement insertErrorLog;
         private final PreparedStatement resetStatus;
         private final PreparedStatement insertEmitData;
+        private final PreparedStatement updateStatusForEmit;
         private final ObjectMapper objectMapper = new ObjectMapper();
         LZ4Factory factory = LZ4Factory.fastestInstance();
 
@@ -257,6 +262,10 @@ public class AsyncWorkerProcess {
             this.executorService = Executors.newFixedThreadPool(1);
             this.executorCompletionService = new 
ExecutorCompletionService<>(executorService);
             this.parseTimeoutMs = parseTimeoutMs;
+
+            SimpleModule module = new SimpleModule();
+            module.addSerializer(Metadata.class, new JsonMetadata());
+            objectMapper.registerModule(module);
             String sql = "insert into workers (worker_id, status) " +
                     "values (" + workerId + ", "+
                     WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
@@ -264,6 +273,11 @@ public class AsyncWorkerProcess {
             insertErrorLog = prepareInsertErrorLog(connection);
             resetStatus = prepareReset(connection);
             insertEmitData = prepareInsertEmitData(connection);
+            sql = "update task_queue set status="+
+                    TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
+                    ", time_stamp=CURRENT_TIMESTAMP()" +
+                    " where id=?";
+            updateStatusForEmit = connection.prepareStatement(sql);
         }
 
 
@@ -339,10 +353,12 @@ public class AsyncWorkerProcess {
                         try {
                             emit(asyncData);
                         } catch (JsonProcessingException e) {
+                            e.printStackTrace();
                             recordBadEmit(task.getTaskId(),
                                     task.getFetchKey().getKey(),
                                     ERROR_CODES.EMIT_SERIALIZATION.ordinal());
                         } catch (SQLException e) {
+                            e.printStackTrace();
                             recordBadEmit(task.getTaskId(),
                                     task.getFetchKey().getKey(),
                                     
ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
@@ -365,6 +381,10 @@ public class AsyncWorkerProcess {
             insertEmitData.setLong(2, bytes.length);
             insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
             insertEmitData.execute();
+            updateStatusForEmit.clearParameters();
+            updateStatusForEmit.setLong(1,
+                    asyncData.getAsyncTask().getTaskId());
+            updateStatusForEmit.execute();
         }
 
         private void handleTimeout(long taskId, String key) throws 
TimeoutException {
@@ -405,9 +425,8 @@ public class AsyncWorkerProcess {
 
         static PreparedStatement prepareInsertEmitData(Connection connection) 
throws SQLException {
             return connection.prepareStatement(
-                    "insert into emits (status, time_stamp, uncompressed_size, 
bytes) " +
-                            " values 
("+AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
-                            ",CURRENT_TIMESTAMP(),?,?)"
+                    "insert into emits (id, time_stamp, uncompressed_size, 
bytes) " +
+                            " values (?,CURRENT_TIMESTAMP(),?,?)"
             );
         }
     }
@@ -445,7 +464,7 @@ public class AsyncWorkerProcess {
             }
             injectUserMetadata(userMetadata, metadataList);
             EmitKey emitKey = task.getEmitKey();
-            if (StringUtils.isBlank(emitKey.getKey())) {
+            if (StringUtils.isBlank(emitKey.getEmitKey())) {
                 emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
                 task.setEmitKey(emitKey);
             }
diff --git a/tika-pipes/tika-pipes-app/src/main/resources/log4j.properties 
b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
similarity index 100%
rename from tika-pipes/tika-pipes-app/src/main/resources/log4j.properties
rename to tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
 
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
rename to 
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
 
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
rename to 
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
 
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
rename to 
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
 
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
rename to 
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
diff --git 
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
 
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
similarity index 100%
rename from 
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
rename to 
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 58b4ff4..332813f 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -228,7 +228,7 @@ public class PipeIntegrationTests {
             userMetadata.set("project", "my-project");
 
             try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), 
t.getMetadata())) {
-                emitter.emit(t.getEmitKey().getKey(), is, userMetadata);
+                emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
             }
         }
     }
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index 64e3aa2..2ad8035 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -56,6 +56,11 @@
       <artifactId>jackson-core</artifactId>
       <version>${jackson.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
index 21b6377..e256d7d 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
@@ -32,7 +32,7 @@ public class JsonEmitData {
             jsonGenerator.writeStartObject();
             EmitKey key = emitData.getEmitKey();
             jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITTER, 
key.getEmitterName());
-            jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY, 
key.getKey());
+            jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY, 
key.getEmitKey());
             jsonGenerator.writeFieldName("data");
             jsonGenerator.writeStartArray();
             for (Metadata m : emitData.getMetadataList()) {
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 55d10ef..3024f6a 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -135,8 +135,8 @@ public class JsonFetchEmitTuple {
         jsonGenerator.writeStringField(FETCHER, 
t.getFetchKey().getFetcherName());
         jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
         jsonGenerator.writeStringField(EMITTER, 
t.getEmitKey().getEmitterName());
-        if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
-            jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
+        if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) {
+            jsonGenerator.writeStringField(EMITKEY, 
t.getEmitKey().getEmitKey());
         }
         if (t.getMetadata().size() > 0) {
             jsonGenerator.writeFieldName(METADATAKEY);
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index b33c2a9..06612a1 100644
--- 
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ 
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -26,13 +26,19 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
-public class JsonMetadata {
+public class JsonMetadata extends StdSerializer<Metadata> {
 
     static volatile boolean PRETTY_PRINT = false;
 
+    public JsonMetadata() {
+        super(Metadata.class);
+    }
+
     /**
      * Serializes a Metadata object to Json.  This does not flush or close the 
writer.
      *
@@ -140,4 +146,10 @@ public class JsonMetadata {
         PRETTY_PRINT = prettyPrint;
     }
 
+    @Override
+    public void serialize(Metadata metadata,
+                          JsonGenerator jsonGenerator,
+                          SerializerProvider serializerProvider) throws 
IOException {
+        writeMetadataObject(metadata, jsonGenerator, false);
+    }
 }
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
index 315927a..3589f4d 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -97,7 +97,7 @@ public class AsyncEmitter implements Callable<Integer> {
 
         void add(EmitData data) {
             size++;
-            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), 
data.getMetadataList());
+            long sz = 
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(), 
data.getMetadataList());
             if (estimatedSize + sz > maxBytes) {
                 LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
                         (estimatedSize+sz), maxBytes);
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index c20fc75..1c8629c 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -124,7 +124,7 @@ public class AsyncParser implements Callable<Integer> {
 
         injectUserMetadata(userMetadata, metadataList);
         EmitKey emitKey = t.getEmitKey();
-        if (StringUtils.isBlank(emitKey.getKey())) {
+        if (StringUtils.isBlank(emitKey.getEmitKey())) {
             emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
         }
         return new EmitData(emitKey, metadataList);
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index ce437d7..047fa13 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -17,15 +17,11 @@
 
 package org.apache.tika.server.core.resource;
 
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import org.apache.tika.Tika;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
@@ -50,7 +46,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -197,7 +192,7 @@ public class EmitterResource {
         //use fetch key if emitter key is not specified
         //TODO: clean this up?
         EmitKey emitKey = t.getEmitKey();
-        if (StringUtils.isBlank(emitKey.getKey())) {
+        if (StringUtils.isBlank(emitKey.getEmitKey())) {
             emitKey = new EmitKey(emitKey.getEmitterName(), 
t.getFetchKey().getKey());
         }
         return emitKey;
@@ -207,7 +202,7 @@ public class EmitterResource {
             Map<String, String> statusMap = new HashMap<>();
             statusMap.put("status", "ok");
             statusMap.put("emitter", t.getEmitKey().getEmitterName());
-            statusMap.put("emitKey", t.getEmitKey().getKey());
+            statusMap.put("emitKey", t.getEmitKey().getEmitKey());
             String msg = 
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
             statusMap.put("parse_exception", msg);
             return statusMap;
@@ -263,9 +258,9 @@ public class EmitterResource {
         String status = "ok";
         String exceptionMsg = "";
         try {
-            emitter.emit(emitKey.getKey(), metadataList);
+            emitter.emit(emitKey.getEmitKey(), metadataList);
         } catch (IOException | TikaEmitterException e) {
-            LOG.warn("problem emitting ("+emitKey.getKey()+")", e);
+            LOG.warn("problem emitting ("+emitKey.getEmitKey()+")", e);
             status = "emitter_exception";
             exceptionMsg = ExceptionUtils.getStackTrace(e);
         }

Reply via email to