This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-3304 by this push:
new 26ff633 WIP -- do not merge...still a bunch to do
26ff633 is described below
commit 26ff633408fd6afe3995acbf6e6cca5b9592c7a1
Author: tballison <[email protected]>
AuthorDate: Fri Mar 12 17:24:08 2021 -0500
WIP -- do not merge...still a bunch to do
---
.../apache/tika/pipes/emitter/AbstractEmitter.java | 2 +-
.../org/apache/tika/pipes/emitter/EmitKey.java | 10 +-
.../org/apache/tika/pipes/fetcher/FetchKey.java | 9 +-
tika-pipes/pom.xml | 2 +-
.../tika/pipes/emitter/solr/SolrEmitter.java | 4 +-
.../org/apache/tika/pipes/async/AsyncEmitter.java | 167 -------------
.../tika/pipes/async/AsyncEmitterProcess.java | 156 ------------
.../{tika-pipes-app => tika-pipes-async}/pom.xml | 2 +-
.../java/org/apache/tika/pipes/async/AsyncCli.java | 16 +-
.../org/apache/tika/pipes/async/AsyncConfig.java | 0
.../org/apache/tika/pipes/async/AsyncData.java | 0
.../org/apache/tika/pipes/async/AsyncEmitHook.java | 0
.../org/apache/tika/pipes/async/AsyncEmitter.java} | 55 ++--
.../tika/pipes/async/AsyncEmitterProcess.java | 276 +++++++++++++++++++++
.../tika/pipes/async/AsyncPipesEmitHook.java | 2 +-
.../apache/tika/pipes/async/AsyncProcessor.java | 38 +--
.../tika/pipes/async/AsyncRuntimeException.java | 0
.../org/apache/tika/pipes/async/AsyncTask.java | 5 +-
.../org/apache/tika/pipes/async/AsyncWorker.java | 6 +-
.../tika/pipes/async/AsyncWorkerProcess.java | 51 ++--
.../src/main/resources/log4j.properties | 0
.../org/apache/tika/pipes/async/AsyncCliTest.java | 0
.../tika/pipes/async/AsyncProcessorTest.java | 0
.../org/apache/tika/pipes/async/MockEmitter.java | 0
.../org/apache/tika/pipes/async/MockFetcher.java | 0
.../apache/tika/pipes/async/TestPipesDriver.java | 0
.../apache/tika/pipes/PipeIntegrationTests.java | 2 +-
tika-serialization/pom.xml | 5 +
.../tika/metadata/serialization/JsonEmitData.java | 2 +-
.../metadata/serialization/JsonFetchEmitTuple.java | 4 +-
.../tika/metadata/serialization/JsonMetadata.java | 14 +-
.../tika/server/core/resource/AsyncEmitter.java | 2 +-
.../tika/server/core/resource/AsyncParser.java | 2 +-
.../tika/server/core/resource/EmitterResource.java | 13 +-
34 files changed, 436 insertions(+), 409 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 3c340ce..39643b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -48,7 +48,7 @@ public abstract class AbstractEmitter implements Emitter {
@Override
public void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException {
for (EmitData d : emitData) {
- emit(d.getEmitKey().getKey(), d.getMetadataList());
+ emit(d.getEmitKey().getEmitKey(), d.getMetadataList());
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index eb4c2c2..db5cf4e 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -18,9 +18,13 @@ package org.apache.tika.pipes.emitter;
public class EmitKey {
- private final String emitterName;
- private final String emitKey;
+ private String emitterName;
+ private String emitKey;
+ //for serialization only...yuck.
+ public EmitKey() {
+
+ }
public EmitKey(String emitterName, String emitKey) {
this.emitterName = emitterName;
this.emitKey = emitKey;
@@ -30,7 +34,7 @@ public class EmitKey {
return emitterName;
}
- public String getKey() {
+ public String getEmitKey() {
return emitKey;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index 6e86bcf..93b82b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -21,8 +21,13 @@ package org.apache.tika.pipes.fetcher;
* to send to that fetcher to retrieve a specific file.
*/
public class FetchKey {
- private final String fetcherName;
- private final String fetchKey;
+ private String fetcherName;
+ private String fetchKey;
+
+ //this is for serialization...yuck
+ public FetchKey(){
+
+ }
public FetchKey(String fetcherName, String fetchKey) {
this.fetcherName = fetcherName;
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index dec23a1..560ed2d 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -35,7 +35,7 @@
<module>tika-fetchers</module>
<module>tika-emitters</module>
<module>tika-fetch-iterators</module>
- <module>tika-pipes-app</module>
+ <module>tika-pipes-async</module>
<module>tika-pipes-integration-tests</module>
</modules>
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index f299913..7fb7f1b 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -34,12 +34,10 @@ import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
-import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -120,7 +118,7 @@ public class SolrEmitter extends AbstractEmitter implements
Initializable {
try (JsonGenerator jsonGenerator = new
JsonFactory().createGenerator(writer)) {
jsonGenerator.writeStartArray();
for (EmitData d : batch) {
- jsonify(jsonGenerator, d.getEmitKey().getKey(),
d.getMetadataList());
+ jsonify(jsonGenerator, d.getEmitKey().getEmitKey(),
d.getMetadataList());
}
jsonGenerator.writeEndArray();
}
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
deleted file mode 100644
index bdc6cd3..0000000
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-//TODO -- convert this into a watcher over the AsyncEmitterProcess
-//along the lines of AsyncWorker
-//then add emitters to the main loop in async processor
-public class AsyncEmitter implements Callable<Integer> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitter.class);
-
- private final EmitterManager emitterManager;
- private final AsyncEmitHook asyncEmitHook;
- private final long emitWithinMs;
- private final long emitMaxBytes;
- private final EmitDataCache cache;
- ArrayBlockingQueue<AsyncData> dataQueue = new ArrayBlockingQueue<>(1000);
-
- Instant lastEmitted = Instant.now();
-
- public AsyncEmitter(EmitterManager emitterManager, AsyncEmitHook
asyncEmitHook,
- long emitWithinMs, long emitMaxBytes) {
- this.emitterManager = emitterManager;
- this.asyncEmitHook = asyncEmitHook;
- this.emitWithinMs = emitWithinMs;
- this.emitMaxBytes = emitMaxBytes;
- this.cache = new EmitDataCache();
- }
-
- public boolean emit(AsyncData asyncData, long pollMs)
- throws InterruptedException {
- if (asyncData == null) {
- return true;
- }
- return dataQueue.offer(asyncData, pollMs, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public Integer call() throws Exception {
- while (true) {
- AsyncData asyncData = dataQueue.poll(100, TimeUnit.MILLISECONDS);
- if (asyncData != null) {
- cache.add(asyncData);
- }
- long elapsed = ChronoUnit.MILLIS.between(lastEmitted,
Instant.now());
- if (elapsed > emitWithinMs) {
- LOG.debug("{} elapsed > {}, going to emitAll",
- elapsed, emitWithinMs);
- //this can block for a bit
- emitAll();
- }
- }
- }
-
- public void emitAll() {
- cache.emitAll();
- }
-
- private class EmitDataCache {
-
- long estimatedSize = 0;
- int size = 0;
- Map<String, List<AsyncData>> map = new HashMap<>();
-
-
- void updateEstimatedSize(long newBytes) {
- estimatedSize += newBytes;
- }
-
- synchronized void add(AsyncData data) {
- size++;
- long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(),
data.getMetadataList());
- if (estimatedSize + sz > emitMaxBytes) {
- LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
- (estimatedSize + sz), emitMaxBytes);
- emitAll();
- }
- List<AsyncData> cached =
map.get(data.getEmitKey().getEmitterName());
- if (cached == null) {
- cached = new ArrayList<>();
- map.put(data.getEmitKey().getEmitterName(), cached);
- }
- updateEstimatedSize(sz);
- cached.add(data);
- }
-
- private synchronized void emitAll() {
- int emitted = 0;
- LOG.debug("about to emit all {}", size);
- for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
- Emitter emitter = emitterManager.getEmitter(e.getKey());
- tryToEmit(emitter, e.getKey(), e.getValue());
- emitted += e.getValue().size();
- }
- LOG.debug("emitted: {}", emitted);
- estimatedSize = 0;
- size = 0;
- map.clear();
- lastEmitted = Instant.now();
- }
-
- private void tryToEmit(Emitter emitter, String emitterName,
List<AsyncData> cachedEmitData) {
- if (emitter == null) {
- LOG.error("Can't find emitter '{}' in TikaConfig!",
emitterName);
- }
- List<EmitData> emitData = new ArrayList<>();
- Set<AsyncTask> asyncTasks = new HashSet<>();
- for (AsyncData d : cachedEmitData) {
- emitData.add(new EmitData(d.getAsyncTask().getEmitKey(),
d.getMetadataList()));
- asyncTasks.add(d.getAsyncTask());
- }
- try {
- emitter.emit(emitData);
- for (AsyncData d : cachedEmitData) {
- asyncEmitHook.onSuccess(d.getAsyncTask());
- }
- } catch (IOException | TikaEmitterException e) {
- e.printStackTrace();
- for (AsyncData d : cachedEmitData) {
- asyncEmitHook.onFail(d.getAsyncTask());
- }
- e.printStackTrace();
- LOG.warn("emitter class ({}): {}", emitter.getClass(),
- ExceptionUtils.getStackTrace(e));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
deleted file mode 100644
index 58854cb..0000000
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class AsyncEmitterProcess {
-
- private long emitWithinMs = 1000;
- private long emitMaxBytes = 10_000_000;
- private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitterProcess.class);
-
- private final LZ4FastDecompressor decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- public static void main(String[] args) throws Exception {
- String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
- TikaConfig tikaConfig = new
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
- int workerId = Integer.parseInt(args[0]);
- LOG.debug("trying to get connection {} >{}<", workerId, db);
- try (Connection connection = DriverManager.getConnection(db)) {
- AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
- asyncEmitter.execute(connection, workerId, tikaConfig);
- }
- System.exit(0);
- }
-
- private void execute(Connection connection, int workerId,
- TikaConfig tikaConfig) throws SQLException {
- int recordsPerPulse = 10;
- String sql = "update emits set status=" +
- AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal()+
- ", worker_id="+workerId+
- " where emit_id in " +
- " (select emit_id from emits "+//where worker_id = " +
workerId +
- " and status="+
AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
- " order by time_stamp asc limit "+recordsPerPulse+" for
update)";
- PreparedStatement markForSelecting = connection.prepareStatement(sql);
- sql = "select emit_id, uncompressed_size, bytes from emits where
status=" +
- AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal() +
- " and worker_id=" + workerId +
- " order by time_stamp asc";
- PreparedStatement selectForProcessing =
connection.prepareStatement(sql);
- sql = "delete from emits where emit_id=?";
- PreparedStatement deleteFromEmits = connection.prepareStatement(sql);
-
- }
-
- private AsyncData deserialize(byte[] compressed, int decompressedLength)
- throws IOException {
- byte[] restored = new byte[decompressedLength];
- int compressedLength2 = decompressor.decompress(compressed, 0,
restored,
- 0, decompressedLength);
-
- return objectMapper.readerFor(AsyncTask.class).readValue(restored);
- }
-
- private static class EmitDataCache {
- private final EmitterManager emitterManager;
- private final long maxBytes;
-
- long estimatedSize = 0;
- int size = 0;
- Map<String, List<AsyncData>> map = new HashMap<>();
-
- public EmitDataCache(EmitterManager emitterManager,
- long maxBytes) {
- this.emitterManager = emitterManager;
- this.maxBytes = maxBytes;
- }
-
- void updateEstimatedSize(long newBytes) {
- estimatedSize += newBytes;
- }
-
- void add(AsyncData data) {
-
- size++;
- long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(),
data.getMetadataList());
- if (estimatedSize + sz > maxBytes) {
- LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
- (estimatedSize+sz), maxBytes);
- emitAll();
- }
- List<AsyncData> cached =
map.get(data.getEmitKey().getEmitterName());
- if (cached == null) {
- cached = new ArrayList<>();
- map.put(data.getEmitKey().getEmitterName(), cached);
- }
- updateEstimatedSize(sz);
- cached.add(data);
- }
-
- private void emitAll() {
- int emitted = 0;
- LOG.debug("about to emit {}", size);
- for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
- Emitter emitter = emitterManager.getEmitter(e.getKey());
- tryToEmit(emitter, e.getValue());
- emitted += e.getValue().size();
- }
- LOG.debug("emitted: {}", emitted);
- estimatedSize = 0;
- size = 0;
- map.clear();
- //lastEmitted = Instant.now();
- }
-
- private long tryToEmit(Emitter emitter, List<AsyncData>
cachedEmitData) {
-
- try {
- emitter.emit(cachedEmitData);
- } catch (IOException | TikaEmitterException e) {
- LOG.warn("emitter class ({}): {}", emitter.getClass(),
- ExceptionUtils.getStackTrace(e));
- }
- return 1;
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-app/pom.xml
b/tika-pipes/tika-pipes-async/pom.xml
similarity index 98%
rename from tika-pipes/tika-pipes-app/pom.xml
rename to tika-pipes/tika-pipes-async/pom.xml
index 9e26eb2..ded8a0a 100644
--- a/tika-pipes/tika-pipes-app/pom.xml
+++ b/tika-pipes/tika-pipes-async/pom.xml
@@ -28,7 +28,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-pipes-app</artifactId>
+ <artifactId>tika-pipes-async</artifactId>
<packaging>pom</packaging>
<name>Apache Tika emitters</name>
<url>http://tika.apache.org/</url>
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
similarity index 96%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index cdf8855..4860299 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -90,7 +90,7 @@ public class AsyncCli {
";AUTO_SERVER=TRUE";
Connection connection = DriverManager.getConnection(url);
- String sql = "create table parse_queue " +
+ String sql = "create table task_queue " +
"(id bigint auto_increment primary key," +
"status tinyint," +//byte
"worker_id integer," +
@@ -132,7 +132,7 @@ public class AsyncCli {
Connection connection) throws SQLException {
this.queue = queue;
this.connection = connection;
- String sql = "insert into parse_queue (status, time_stamp,
worker_id, retry, json) " +
+ String sql = "insert into task_queue (status, time_stamp,
worker_id, retry, json) " +
"values (?,CURRENT_TIMESTAMP(),?,?,?)";
insert = connection.prepareStatement(sql);
}
@@ -196,30 +196,30 @@ public class AsyncCli {
//this gets workers and # of tasks in desc order of number of tasks
String sql = "select w.worker_id, p.cnt " +
"from workers w " +
- "left join (select worker_id, count(1) as cnt from
parse_queue " +
+ "left join (select worker_id, count(1) as cnt from
task_queue " +
"where status=0 group by worker_id)" +
" p on p.worker_id=w.worker_id order by p.cnt desc";
getQueueDistribution = connection.prepareStatement(sql);
//find workers that have assigned tasks but are not in the
//workers table
- sql = "select p.worker_id, count(1) as cnt from parse_queue p " +
+ sql = "select p.worker_id, count(1) as cnt from task_queue p " +
"left join workers w on p.worker_id=w.worker_id " +
"where w.worker_id is null group by p.worker_id";
findMissingWorkers = connection.prepareStatement(sql);
- sql = "update parse_queue set worker_id=? where worker_id=?";
+ sql = "update task_queue set worker_id=? where worker_id=?";
allocateNonworkersToWorkers = connection.prepareStatement(sql);
//current strategy reallocate tasks from longest queue to shortest
//TODO: might consider randomly shuffling or other algorithms
- sql = "update parse_queue set worker_id= ? where id in " +
- "(select id from parse_queue where " +
+ sql = "update task_queue set worker_id= ? where id in " +
+ "(select id from task_queue where " +
"worker_id = ? and " +
"rand() < 0.8 " +
"and status=0 for update)";
reallocate = connection.prepareStatement(sql);
- sql = "select count(1) from parse_queue where status="
+ sql = "select count(1) from task_queue where status="
+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
countAvailableTasks = connection.prepareStatement(sql);
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
similarity index 77%
copy from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
copy to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index ff4e5d4..071a1fa 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -1,8 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.pipes.async;
+
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ProcessUtils;
+import org.apache.tika.utils.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,21 +35,24 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static
org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
-/**
- * This controls monitoring of the AsyncWorkerProcess
- * and updates to the db on crashes etc.
- */
-public class AsyncWorker implements Callable<Integer> {
+public class AsyncEmitter implements Callable<Integer> {
- private static final Logger LOG =
LoggerFactory.getLogger(AsyncWorker.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitter.class);
private final String connectionString;
@@ -40,7 +65,7 @@ public class AsyncWorker implements Callable<Integer> {
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
- public AsyncWorker(Connection connection,
+ public AsyncEmitter(Connection connection,
String connectionString, int workerId,
Path tikaConfigPath) throws SQLException {
this.connectionString = connectionString;
@@ -57,7 +82,7 @@ public class AsyncWorker implements Callable<Integer> {
" where worker_id = (" + workerId + ")";
restarting = connection.prepareStatement(sql);
//this checks if the process was able to reset the status
- sql = "select id, retry, json from parse_queue where worker_id="
+ sql = "select id, retry, json from task_queue where worker_id="
+ workerId +
" and status=" +
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
selectActiveTasks = connection.prepareStatement(sql);
@@ -80,7 +105,7 @@ public class AsyncWorker implements Callable<Integer> {
if (finished) {
int exitValue = p.exitValue();
if (exitValue == 0) {
- LOG.info("child process finished with exitValue=0");
+ LOG.info("forked emitter process finished with
exitValue=0");
return 1;
}
reportCrash(++restarts, exitValue);
@@ -99,7 +124,7 @@ public class AsyncWorker implements Callable<Integer> {
String[] args = new String[]{
"java", "-Djava.awt.headless=true",
"-cp", System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncWorkerProcess",
+ "org.apache.tika.pipes.async.AsyncEmitterProcess",
Integer.toString(workerId)
};
ProcessBuilder pb = new ProcessBuilder(args);
@@ -138,8 +163,8 @@ public class AsyncWorker implements Callable<Integer> {
}
static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES
errorCode,
- PreparedStatement insertErrorLog,
PreparedStatement resetStatus,
- Logger logger) {
+ PreparedStatement insertErrorLog,
PreparedStatement resetStatus,
+ Logger logger) {
try {
insertErrorLog.clearParameters();
insertErrorLog.setLong(1, task.getTaskId());
@@ -166,14 +191,14 @@ public class AsyncWorker implements Callable<Integer> {
//if not, this is called to insert into the error log
return connection.prepareStatement(
"insert into error_log (task_id, fetch_key, time_stamp, retry,
error_code) " +
- " values (?,?,CURRENT_TIMESTAMP(),?,?)"
+ " values (?,?,CURRENT_TIMESTAMP(),?,?)"
);
}
static PreparedStatement prepareReset(Connection connection) throws
SQLException {
//and this is called to reset the status on error
return connection.prepareStatement(
- "update parse_queue set " +
+ "update task_queue set " +
"status=?, " +
"time_stamp=CURRENT_TIMESTAMP(), " +
"retry=? " +
diff --git
a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
new file mode 100644
index 0000000..37c5f3a
--- /dev/null
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.commons.io.IOUtils;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AsyncEmitterProcess {
+
+ //TODO -- parameterize these
+ private long emitWithinMs = 10000;
+ private long emitMaxBytes = 10_000_000;
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitterProcess.class);
+
+ private final LZ4FastDecompressor decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ int recordsPerPulse = 10;
+ private PreparedStatement markForSelecting;
+ private PreparedStatement selectForProcessing;
+ private PreparedStatement emitStatusUpdate;
+ private PreparedStatement checkForShutdown;
+
+ public static void main(String[] args) throws Exception {
+ String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+ TikaConfig tikaConfig = new
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+ int workerId = Integer.parseInt(args[0]);
+ LOG.debug("trying to get connection {} >{}<", workerId, db);
+ try (Connection connection = DriverManager.getConnection(db)) {
+ AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+ asyncEmitter.execute(connection, workerId, tikaConfig);
+ }
+ System.exit(0);
+ }
+
+ private void execute(Connection connection, int workerId,
+ TikaConfig tikaConfig) throws SQLException,
+ InterruptedException {
+ prepareStatements(connection, workerId);
+ EmitterManager emitterManager = tikaConfig.getEmitterManager();
+ EmitDataCache emitDataCache = new EmitDataCache(emitterManager,
emitMaxBytes,
+ emitStatusUpdate);
+ int shouldShutdown = 0;
+ while (true) {
+ int toEmit = markForSelecting.executeUpdate();
+ if (toEmit > 0) {
+ try (ResultSet rs = selectForProcessing.executeQuery()) {
+ while (rs.next()) {
+ long id = rs.getLong(1);
+ Timestamp ts = rs.getTimestamp(2);
+ int uncompressedSize = rs.getInt(3);
+ Blob blob = rs.getBlob(4);
+ try {
+ tryToEmit(id, ts, uncompressedSize, blob,
+ emitDataCache);
+ } catch (SQLException|IOException e) {
+ reportEmitStatus(
+ Collections.singletonList(id),
+
AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+ emitStatusUpdate
+ );
+ }
+ }
+ }
+ }
+ if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
+ emitDataCache.emitAll();
+ }
+ Thread.sleep(500);
+ if (shouldShutdown()) {
+ shouldShutdown++;
+ }
+ //make sure to test twice
+ if (shouldShutdown > 1) {
+ emitDataCache.emitAll();
+ return;
+ }
+ }
+ }
+
+ private void tryToEmit(long id, Timestamp ts,
+ int decompressedLength,
+ Blob blob, EmitDataCache emitDataCache)
+ throws SQLException, IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOUtils.copyLarge(blob.getBinaryStream(), bos);
+ AsyncData asyncData = deserialize(bos.toByteArray(),
decompressedLength);
+ emitDataCache.add(asyncData);
+ }
+
+ boolean shouldShutdown() throws SQLException {
+ try (ResultSet rs = checkForShutdown.executeQuery()) {
+ if (rs.next()) {
+ int val = rs.getInt(1);
+ return val > 0;
+ }
+ }
+ return false;
+ }
+
+ private void prepareStatements(Connection connection, int workerId) throws
SQLException {
+ String sql = "update task_queue set status=" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal()+
+ ", worker_id="+workerId+", time_stamp=CURRENT_TIMESTAMP()"+
+ " where id in " +
+ " (select id from task_queue "+//where worker_id = " +
workerId +
+ " where status="+
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
+ " order by time_stamp asc limit "+recordsPerPulse+" for
update)";
+ markForSelecting = connection.prepareStatement(sql);
+
+ sql = "select q.id, q.time_stamp, uncompressed_size, bytes from emits
e " +
+ "join task_queue q " +
+ "where q.status=" +
+ AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() +
+ " and worker_id=" + workerId +
+ " order by time_stamp asc";
+ selectForProcessing = connection.prepareStatement(sql);
+
+ sql = "update task_queue set status=?"+
+ ", time_stamp=CURRENT_TIMESTAMP()"+
+ " where id=?";
+ emitStatusUpdate = connection.prepareStatement(sql);
+
+ sql = "select count(1) from workers where worker_id=" + workerId +
+ " and status="+
AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+ checkForShutdown = connection.prepareStatement(sql);
+ }
+
+
+ private AsyncData deserialize(byte[] compressed, int decompressedLength)
+ throws IOException {
+ byte[] restored = new byte[decompressedLength];
+ int compressedLength2 = decompressor.decompress(compressed, 0,
restored,
+ 0, decompressedLength);
+
+ return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+ }
+
+ private static class EmitDataCache {
+ private final EmitterManager emitterManager;
+ private final long maxBytes;
+ private final PreparedStatement emitStatusUpdate;
+ private Instant lastAdded = Instant.now();
+
+ long estimatedSize = 0;
+ int size = 0;
+ Map<String, List<AsyncData>> map = new HashMap<>();
+
+ public EmitDataCache(EmitterManager emitterManager,
+ long maxBytes, PreparedStatement
emitStatusUpdate) {
+ this.emitterManager = emitterManager;
+ this.maxBytes = maxBytes;
+ this.emitStatusUpdate = emitStatusUpdate;
+ }
+
+ void updateEstimatedSize(long newBytes) {
+ estimatedSize += newBytes;
+ }
+
+ void add(AsyncData data) {
+
+ size++;
+ long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(),
data.getMetadataList());
+ if (estimatedSize + sz > maxBytes) {
+ LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
+ (estimatedSize+sz), maxBytes);
+ emitAll();
+ }
+ List<AsyncData> cached =
map.get(data.getEmitKey().getEmitterName());
+ if (cached == null) {
+ cached = new ArrayList<>();
+ map.put(data.getEmitKey().getEmitterName(), cached);
+ }
+ updateEstimatedSize(sz);
+ cached.add(data);
+ lastAdded = Instant.now();
+ }
+
+ private void emitAll() {
+ int emitted = 0;
+ LOG.debug("about to emit {}", size);
+ for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
+ Emitter emitter = emitterManager.getEmitter(e.getKey());
+
+ try {
+ tryToEmit(emitter, e.getValue());
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ emitted += e.getValue().size();
+ }
+ LOG.debug("emitted: {}", emitted);
+ estimatedSize = 0;
+ size = 0;
+ map.clear();
+ }
+
+ private long tryToEmit(Emitter emitter, List<AsyncData> cachedEmitData)
+ throws SQLException {
+ List<Long> ids = new ArrayList<>();
+ for (AsyncData d : cachedEmitData) {
+ ids.add(d.getAsyncTask().getTaskId());
+ }
+ try {
+ emitter.emit(cachedEmitData);
+ } catch (IOException | TikaEmitterException e) {
+ LOG.warn("emitter class ({}): {}", emitter.getClass(),
+ ExceptionUtils.getStackTrace(e));
+ reportEmitStatus(ids,
AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+ emitStatusUpdate);
+ }
+ reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED,
+ emitStatusUpdate);
+ return 1;
+ }
+
+
+ public boolean exceedsEmitWithin(long emitWithinMs) {
+ return ChronoUnit.MILLIS.between(lastAdded, Instant.now())
+ > emitWithinMs;
+ }
+ }
+
+ private static void reportEmitStatus(List<Long> ids,
+ AsyncWorkerProcess.TASK_STATUS_CODES
emitted,
+ PreparedStatement emitStatusUpdate)
+ throws SQLException {
+ for (long id : ids) {
+ emitStatusUpdate.clearParameters();
+ emitStatusUpdate.setByte(1, (byte)emitted.ordinal());
+ emitStatusUpdate.setLong(2, id);
+ emitStatusUpdate.executeUpdate();
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
similarity index 95%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
index f8665d7..3828b24 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -15,7 +15,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
private final PreparedStatement markFailure;
public AsyncPipesEmitHook(Connection connection) throws SQLException {
- String sql = "delete from parse_queue where id=?";
+ String sql = "delete from task_queue where id=?";
markSuccess = connection.prepareStatement(sql);
//TODO --fix this
markFailure = connection.prepareStatement(sql);
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
similarity index 92%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index e7dc0c1..b4d8a8a 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -58,7 +58,7 @@ public class AsyncProcessor implements Closeable {
this.asyncConfig = AsyncConfig.load(tikaConfigPath);
this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.connection =
DriverManager.getConnection(asyncConfig.getJdbcString());
- this.totalThreads = asyncConfig.getMaxConsumers() + 2;
+ this.totalThreads = asyncConfig.getMaxConsumers() + 2 + 1;
}
public synchronized boolean offer (
@@ -134,17 +134,20 @@ public class AsyncProcessor implements Closeable {
executorCompletionService.submit(new AsyncWorker(connection,
asyncConfig.getJdbcString(), i, tikaConfigPath));
}
+ executorCompletionService.submit(new AsyncEmitter(connection,
+ asyncConfig.getJdbcString(), asyncConfig.getMaxConsumers(),
+ tikaConfigPath));
}
private void setupTables() throws SQLException {
- String sql = "create table parse_queue " +
+ String sql = "create table task_queue " +
"(id bigint auto_increment primary key," +
"status tinyint," +//byte
"worker_id integer," +
"retry smallint," + //short
"time_stamp timestamp," +
- "json varchar(64000))";
+ "json varchar(64000))";//this is the AsyncTask ... not the
emit data!
try (Statement st = connection.createStatement()) {
st.execute(sql);
}
@@ -165,10 +168,8 @@ public class AsyncProcessor implements Closeable {
}
sql = "create table emits (" +
- "emit_id bigint auto_increment primary key, " +
- "status tinyint, " +
- "worker_id integer, " +
- "time_stamp timestamp, " +
+ "id bigint primary key, " +
+ "time_stamp timestamp, "+
"uncompressed_size bigint, " +
"bytes blob)";
try (Statement st = connection.createStatement()) {
@@ -203,6 +204,15 @@ public class AsyncProcessor implements Closeable {
//swallow
}
}
+ //TODO: clean this up
+ String sql = "update workers set status="+
+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+ " where worker_id = (" + asyncConfig.getMaxConsumers() +
")";
+ try {
+ connection.prepareStatement(sql).execute();
+ } catch (SQLException throwables) {
+ throwables.printStackTrace();
+ }
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
try {
@@ -239,7 +249,7 @@ public class AsyncProcessor implements Closeable {
Connection connection) throws SQLException {
this.queue = queue;
this.connection = connection;
- String sql = "insert into parse_queue (status, time_stamp,
worker_id, retry, json) " +
+ String sql = "insert into task_queue (status, time_stamp,
worker_id, retry, json) " +
"values (?,CURRENT_TIMESTAMP(),?,?,?)";
insert = connection.prepareStatement(sql);
}
@@ -304,30 +314,30 @@ public class AsyncProcessor implements Closeable {
//this gets workers and # of tasks in desc order of number of tasks
String sql = "select w.worker_id, p.cnt " +
"from workers w " +
- "left join (select worker_id, count(1) as cnt from
parse_queue " +
+ "left join (select worker_id, count(1) as cnt from
task_queue " +
"where status=0 group by worker_id)" +
" p on p.worker_id=w.worker_id order by p.cnt desc";
getQueueDistribution = connection.prepareStatement(sql);
//find workers that have assigned tasks but are not in the
//workers table
- sql = "select p.worker_id, count(1) as cnt from parse_queue p " +
+ sql = "select p.worker_id, count(1) as cnt from task_queue p " +
"left join workers w on p.worker_id=w.worker_id " +
"where w.worker_id is null group by p.worker_id";
findMissingWorkers = connection.prepareStatement(sql);
- sql = "update parse_queue set worker_id=? where worker_id=?";
+ sql = "update task_queue set worker_id=? where worker_id=?";
allocateNonworkersToWorkers = connection.prepareStatement(sql);
//current strategy reallocate tasks from longest queue to shortest
//TODO: might consider randomly shuffling or other algorithms
- sql = "update parse_queue set worker_id= ? where id in " +
- "(select id from parse_queue where " +
+ sql = "update task_queue set worker_id= ? where id in " +
+ "(select id from task_queue where " +
"worker_id = ? and " +
"rand() < 0.8 " +
"and status=0 for update)";
reallocate = connection.prepareStatement(sql);
- sql = "select count(1) from parse_queue where status="
+ sql = "select count(1) from task_queue where status="
+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
countAvailableTasks = connection.prepareStatement(sql);
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
similarity index 79%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index eaac09d..24acdf8 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -1,5 +1,6 @@
package org.apache.tika.pipes.async;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
@@ -11,8 +12,8 @@ public class AsyncTask extends FetchEmitTuple {
private long taskId;
private final short retry;
- public AsyncTask(long taskId, short retry,
- FetchEmitTuple fetchEmitTuple) {
+ public AsyncTask(@JsonProperty("taskId") long taskId,
@JsonProperty("retry") short retry,
+ @JsonProperty("fetchEmitTuple") FetchEmitTuple
fetchEmitTuple) {
super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
fetchEmitTuple.getMetadata());
this.taskId = taskId;
this.retry = retry;
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
similarity index 97%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index ff4e5d4..0b685a2 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -57,7 +57,7 @@ public class AsyncWorker implements Callable<Integer> {
" where worker_id = (" + workerId + ")";
restarting = connection.prepareStatement(sql);
//this checks if the process was able to reset the status
- sql = "select id, retry, json from parse_queue where worker_id="
+ sql = "select id, retry, json from task_queue where worker_id="
+ workerId +
" and status=" +
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
selectActiveTasks = connection.prepareStatement(sql);
@@ -80,7 +80,7 @@ public class AsyncWorker implements Callable<Integer> {
if (finished) {
int exitValue = p.exitValue();
if (exitValue == 0) {
- LOG.info("child process finished with exitValue=0");
+ LOG.info("forked worker process finished with
exitValue=0");
return 1;
}
reportCrash(++restarts, exitValue);
@@ -173,7 +173,7 @@ public class AsyncWorker implements Callable<Integer> {
static PreparedStatement prepareReset(Connection connection) throws
SQLException {
//and this is called to reset the status on error
return connection.prepareStatement(
- "update parse_queue set " +
+ "update task_queue set " +
"status=?, " +
"time_stamp=CURRENT_TIMESTAMP(), " +
"retry=? " +
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
similarity index 92%
rename from
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
rename to
tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index d01e9a0..8675f22 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++
b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -2,13 +2,14 @@ package org.apache.tika.pipes.async;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import net.jpountz.lz4.LZ4Factory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonMetadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
@@ -27,7 +28,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -55,6 +55,11 @@ public class AsyncWorkerProcess {
AVAILABLE,
SELECTED,
IN_PROCESS,
+ AVAILABLE_EMIT,
+ SELECTED_EMIT,
+ IN_PROCESS_EMIT,
+ FAILED_EMIT,
+ EMITTED
}
public enum WORKER_STATUS_CODES {
@@ -65,11 +70,6 @@ public class AsyncWorkerProcess {
SHUTDOWN
}
- public enum EMIT_STATUS_CODES {
- READY,
- EMITTING,
- EMITTED
- }
enum ERROR_CODES {
TIMEOUT,
SECURITY_EXCEPTION,
@@ -148,21 +148,24 @@ public class AsyncWorkerProcess {
TaskQueue(Connection connection, int workerId) throws SQLException {
this.connection = connection;
this.workerId = workerId;
- String sql = "update parse_queue set status=" +
+ //TODO -- need to update timestamp
+ String sql = "update task_queue set status=" +
TASK_STATUS_CODES.SELECTED.ordinal()+
+ ", time_stamp=CURRENT_TIMESTAMP()"+
" where id = " +
- " (select id from parse_queue where worker_id = " +
workerId +
+ " (select id from task_queue where worker_id = " +
workerId +
" and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
" order by time_stamp asc limit 1 for update)";
markForSelecting = connection.prepareStatement(sql);
- sql = "select id, retry, json from parse_queue where status=" +
+ sql = "select id, retry, json from task_queue where status=" +
TASK_STATUS_CODES.SELECTED.ordinal() +
" and " +
" worker_id=" + workerId +
" order by time_stamp asc limit 1";
selectForProcessing = connection.prepareStatement(sql);
- sql = "update parse_queue set status="+
+ sql = "update task_queue set status="+
TASK_STATUS_CODES.IN_PROCESS.ordinal()+
+ ", time_stamp=CURRENT_TIMESTAMP()"+
" where id=?";
markForProcessing = connection.prepareStatement(sql);
@@ -211,11 +214,12 @@ public class AsyncWorkerProcess {
private void debugQueue() throws SQLException {
try (ResultSet rs = connection.createStatement().executeQuery(
- "select * from parse_queue limit 10")) {
+ "select * from task_queue limit 10")) {
while (rs.next()) {
for (int i = 1; i <= rs.getMetaData().getColumnCount();
i++) {
System.out.print(rs.getString(i)+ " ");
}
+ System.out.println("");
}
}
}
@@ -244,6 +248,7 @@ public class AsyncWorkerProcess {
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
private final PreparedStatement insertEmitData;
+ private final PreparedStatement updateStatusForEmit;
private final ObjectMapper objectMapper = new ObjectMapper();
LZ4Factory factory = LZ4Factory.fastestInstance();
@@ -257,6 +262,10 @@ public class AsyncWorkerProcess {
this.executorService = Executors.newFixedThreadPool(1);
this.executorCompletionService = new
ExecutorCompletionService<>(executorService);
this.parseTimeoutMs = parseTimeoutMs;
+
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Metadata.class, new JsonMetadata());
+ objectMapper.registerModule(module);
String sql = "insert into workers (worker_id, status) " +
"values (" + workerId + ", "+
WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
@@ -264,6 +273,11 @@ public class AsyncWorkerProcess {
insertErrorLog = prepareInsertErrorLog(connection);
resetStatus = prepareReset(connection);
insertEmitData = prepareInsertEmitData(connection);
+ sql = "update task_queue set status="+
+ TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
+ ", time_stamp=CURRENT_TIMESTAMP()" +
+ " where id=?";
+ updateStatusForEmit = connection.prepareStatement(sql);
}
@@ -339,10 +353,12 @@ public class AsyncWorkerProcess {
try {
emit(asyncData);
} catch (JsonProcessingException e) {
+ e.printStackTrace();
recordBadEmit(task.getTaskId(),
task.getFetchKey().getKey(),
ERROR_CODES.EMIT_SERIALIZATION.ordinal());
} catch (SQLException e) {
+ e.printStackTrace();
recordBadEmit(task.getTaskId(),
task.getFetchKey().getKey(),
ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
@@ -365,6 +381,10 @@ public class AsyncWorkerProcess {
insertEmitData.setLong(2, bytes.length);
insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
insertEmitData.execute();
+ updateStatusForEmit.clearParameters();
+ updateStatusForEmit.setLong(1,
+ asyncData.getAsyncTask().getTaskId());
+ updateStatusForEmit.execute();
}
private void handleTimeout(long taskId, String key) throws
TimeoutException {
@@ -405,9 +425,8 @@ public class AsyncWorkerProcess {
static PreparedStatement prepareInsertEmitData(Connection connection)
throws SQLException {
return connection.prepareStatement(
- "insert into emits (status, time_stamp, uncompressed_size,
bytes) " +
- " values
("+AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
- ",CURRENT_TIMESTAMP(),?,?)"
+ "insert into emits (id, time_stamp, uncompressed_size,
bytes) " +
+ " values (?,CURRENT_TIMESTAMP(),?,?)"
);
}
}
@@ -445,7 +464,7 @@ public class AsyncWorkerProcess {
}
injectUserMetadata(userMetadata, metadataList);
EmitKey emitKey = task.getEmitKey();
- if (StringUtils.isBlank(emitKey.getKey())) {
+ if (StringUtils.isBlank(emitKey.getEmitKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
task.setEmitKey(emitKey);
}
diff --git a/tika-pipes/tika-pipes-app/src/main/resources/log4j.properties
b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
similarity index 100%
rename from tika-pipes/tika-pipes-app/src/main/resources/log4j.properties
rename to tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
rename to
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
rename to
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
rename to
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
rename to
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
similarity index 100%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
rename to
tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 58b4ff4..332813f 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -228,7 +228,7 @@ public class PipeIntegrationTests {
userMetadata.set("project", "my-project");
try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(),
t.getMetadata())) {
- emitter.emit(t.getEmitKey().getKey(), is, userMetadata);
+ emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
}
}
}
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index 64e3aa2..2ad8035 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -56,6 +56,11 @@
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
index 21b6377..e256d7d 100644
---
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
+++
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
@@ -32,7 +32,7 @@ public class JsonEmitData {
jsonGenerator.writeStartObject();
EmitKey key = emitData.getEmitKey();
jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITTER,
key.getEmitterName());
- jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY,
key.getKey());
+ jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY,
key.getEmitKey());
jsonGenerator.writeFieldName("data");
jsonGenerator.writeStartArray();
for (Metadata m : emitData.getMetadataList()) {
diff --git
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 55d10ef..3024f6a 100644
---
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -135,8 +135,8 @@ public class JsonFetchEmitTuple {
jsonGenerator.writeStringField(FETCHER,
t.getFetchKey().getFetcherName());
jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
jsonGenerator.writeStringField(EMITTER,
t.getEmitKey().getEmitterName());
- if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
- jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
+ if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) {
+ jsonGenerator.writeStringField(EMITKEY,
t.getEmitKey().getEmitKey());
}
if (t.getMetadata().size() > 0) {
jsonGenerator.writeFieldName(METADATAKEY);
diff --git
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index b33c2a9..06612a1 100644
---
a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++
b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -26,13 +26,19 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
-public class JsonMetadata {
+public class JsonMetadata extends StdSerializer<Metadata> {
static volatile boolean PRETTY_PRINT = false;
+ public JsonMetadata() {
+ super(Metadata.class);
+ }
+
/**
* Serializes a Metadata object to Json. This does not flush or close the
writer.
*
@@ -140,4 +146,10 @@ public class JsonMetadata {
PRETTY_PRINT = prettyPrint;
}
+ @Override
+ public void serialize(Metadata metadata,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws
IOException {
+ writeMetadataObject(metadata, jsonGenerator, false);
+ }
}
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
index 315927a..3589f4d 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -97,7 +97,7 @@ public class AsyncEmitter implements Callable<Integer> {
void add(EmitData data) {
size++;
- long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(),
data.getMetadataList());
+ long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(),
data.getMetadataList());
if (estimatedSize + sz > maxBytes) {
LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
(estimatedSize+sz), maxBytes);
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index c20fc75..1c8629c 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -124,7 +124,7 @@ public class AsyncParser implements Callable<Integer> {
injectUserMetadata(userMetadata, metadataList);
EmitKey emitKey = t.getEmitKey();
- if (StringUtils.isBlank(emitKey.getKey())) {
+ if (StringUtils.isBlank(emitKey.getEmitKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
}
return new EmitData(emitKey, metadataList);
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index ce437d7..047fa13 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -17,15 +17,11 @@
package org.apache.tika.server.core.resource;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import org.apache.tika.Tika;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
@@ -50,7 +46,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -197,7 +192,7 @@ public class EmitterResource {
//use fetch key if emitter key is not specified
//TODO: clean this up?
EmitKey emitKey = t.getEmitKey();
- if (StringUtils.isBlank(emitKey.getKey())) {
+ if (StringUtils.isBlank(emitKey.getEmitKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(),
t.getFetchKey().getKey());
}
return emitKey;
@@ -207,7 +202,7 @@ public class EmitterResource {
Map<String, String> statusMap = new HashMap<>();
statusMap.put("status", "ok");
statusMap.put("emitter", t.getEmitKey().getEmitterName());
- statusMap.put("emitKey", t.getEmitKey().getKey());
+ statusMap.put("emitKey", t.getEmitKey().getEmitKey());
String msg =
metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
statusMap.put("parse_exception", msg);
return statusMap;
@@ -263,9 +258,9 @@ public class EmitterResource {
String status = "ok";
String exceptionMsg = "";
try {
- emitter.emit(emitKey.getKey(), metadataList);
+ emitter.emit(emitKey.getEmitKey(), metadataList);
} catch (IOException | TikaEmitterException e) {
- LOG.warn("problem emitting ("+emitKey.getKey()+")", e);
+ LOG.warn("problem emitting ("+emitKey.getEmitKey()+")", e);
status = "emitter_exception";
exceptionMsg = ExceptionUtils.getStackTrace(e);
}