This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-3304 by this push:
new 025eded WIP -- do not merge...still a bunch to do
025eded is described below
commit 025eded4def96c9dfdd2923386bb38566b8eac26
Author: tballison <[email protected]>
AuthorDate: Fri Mar 5 17:48:39 2021 -0500
WIP -- do not merge...still a bunch to do
---
.../apache/tika/pipes/emitter/AbstractEmitter.java | 2 +-
.../org/apache/tika/pipes/emitter/Emitter.java | 2 +-
.../apache/tika/pipes/emitter/EmptyEmitter.java | 2 +-
.../tika/pipes/emitter/solr/SolrEmitter.java | 2 +-
.../src/test/resources/tika-config-s3.xml | 1 +
tika-pipes/tika-pipes-app/pom.xml | 21 +-
.../java/org/apache/tika/pipes/async/AsyncCli.java | 6 +-
.../org/apache/tika/pipes/async/AsyncEmitter.java | 11 +-
.../tika/pipes/async/AsyncEmitterProcess.java | 156 +++++++++++++++
.../apache/tika/pipes/async/AsyncProcessor.java | 222 ++++++++++++++-------
.../tika/pipes/async/AsyncRuntimeException.java | 27 +--
.../org/apache/tika/pipes/async/AsyncTask.java | 5 +-
.../org/apache/tika/pipes/async/AsyncWorker.java | 43 ++--
.../tika/pipes/async/AsyncWorkerProcess.java | 122 +++++++----
.../tika/pipes/{driver => async}/AsyncCliTest.java | 3 +-
.../tika/pipes/async/AsyncProcessorTest.java | 88 ++++++++
.../org/apache/tika/pipes/async/MockEmitter.java | 76 +++++++
.../org/apache/tika/pipes/async/MockFetcher.java | 29 +++
.../pipes/{driver => async}/TestPipesDriver.java | 11 +-
.../apache/tika/pipes/PipeIntegrationTests.java | 20 +-
.../serialization/JsonFetchEmitTupleTest.java | 6 +-
.../core/TikaServerEmitterIntegrationTest.java | 1 +
22 files changed, 676 insertions(+), 180 deletions(-)
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index a54b708..3c340ce 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -46,7 +46,7 @@ public abstract class AbstractEmitter implements Emitter {
* @throws TikaEmitterException
*/
@Override
- public void emit(List<EmitData> emitData) throws IOException,
TikaEmitterException {
+ public void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException {
for (EmitData d : emitData) {
emit(d.getEmitKey().getKey(), d.getMetadataList());
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index dbea669..8f88985 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -27,7 +27,7 @@ public interface Emitter {
void emit(String emitKey, List<Metadata> metadataList) throws IOException,
TikaEmitterException;
- void emit(List<EmitData> emitData) throws IOException,
TikaEmitterException;
+ void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException;
//TODO -- add this later for xhtml?
//void emit(String txt, Metadata metadata) throws IOException,
TikaException;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 5f83db7..dfded00 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -34,7 +34,7 @@ public class EmptyEmitter implements Emitter {
}
@Override
- public void emit(List<EmitData> emitData) throws IOException,
TikaEmitterException {
+ public void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException {
}
}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 0e09e9c..f299913 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -104,7 +104,7 @@ public class SolrEmitter extends AbstractEmitter implements
Initializable {
}
@Override
- public void emit(List<EmitData> batch) throws IOException,
+ public void emit(List<? extends EmitData> batch) throws IOException,
TikaEmitterException {
if (batch == null || batch.size() == 0) {
LOG.warn("batch is null or empty");
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
index 740058a..176513b 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/resources/tika-config-s3.xml
@@ -22,6 +22,7 @@
<param name="name" type="string">s3</param>
<param name="region" type="string">us-east-1</param>
<param name="profile" type="string">my_profile</param>
+ <param name="credentialsProvider" type="string">profile</param>
</params>
</fetcher>
</fetchers>
diff --git a/tika-pipes/tika-pipes-app/pom.xml
b/tika-pipes/tika-pipes-app/pom.xml
index 1042d42..9e26eb2 100644
--- a/tika-pipes/tika-pipes-app/pom.xml
+++ b/tika-pipes/tika-pipes-app/pom.xml
@@ -61,6 +61,16 @@
<version>${h2.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-emitter-fs</artifactId>
<version>${project.version}</version>
@@ -68,15 +78,24 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>tika-parsers-classic</artifactId>
+ <artifactId>tika-core</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!--
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-parsers-classic-package</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index 2804a7a..cdf8855 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -60,7 +60,7 @@ public class AsyncCli {
if (fetchIterator instanceof EmptyFetchIterator) {
throw new IllegalArgumentException("can't have empty fetch
iterator");
}
- ArrayBlockingQueue<FetchEmitTuple> q =
fetchIterator.init(maxConsumers);
+ ArrayBlockingQueue<FetchEmitTuple> q = new
ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
executorCompletionService.submit(fetchIterator);
executorCompletionService.submit(enqueuer);
@@ -169,7 +169,7 @@ public class AsyncCli {
int workerId = workers.size() == 1 ? workers.get(0) :
workers.get(random.nextInt(workers.size()));
insert.clearParameters();
- insert.setByte(1, (byte)
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+ insert.setByte(1, (byte)
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
insert.setInt(2, workerId);
insert.setShort(3, (short) 0);
insert.setString(4, JsonFetchEmitTuple.toJson(t));
@@ -220,7 +220,7 @@ public class AsyncCli {
reallocate = connection.prepareStatement(sql);
sql = "select count(1) from parse_queue where status="
- + AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal();
+ + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
countAvailableTasks = connection.prepareStatement(sql);
sql = "insert into workers_shutdown (worker_id) values (?)";
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 22ede76..bdc6cd3 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -1,4 +1,4 @@
-package org.apache.tika.pipes.async;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,6 +14,8 @@ package org.apache.tika.pipes.async;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.tika.pipes.async;
+
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
@@ -37,10 +39,9 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-/**
- * Worker thread that takes EmitData off the queue, batches it
- * and tries to emit it as a batch
- */
+//TODO -- convert this into a watcher over the AsyncEmitterProcess
+//along the lines of AsyncWorker
+//then add emitters to the main loop in async processor
public class AsyncEmitter implements Callable<Integer> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitter.class);
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
new file mode 100644
index 0000000..58854cb
--- /dev/null
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AsyncEmitterProcess {
+
+ private long emitWithinMs = 1000;
+ private long emitMaxBytes = 10_000_000;
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitterProcess.class);
+
+ private final LZ4FastDecompressor decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+ String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+ TikaConfig tikaConfig = new
TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+ int workerId = Integer.parseInt(args[0]);
+ LOG.debug("trying to get connection {} >{}<", workerId, db);
+ try (Connection connection = DriverManager.getConnection(db)) {
+ AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+ asyncEmitter.execute(connection, workerId, tikaConfig);
+ }
+ System.exit(0);
+ }
+
+ private void execute(Connection connection, int workerId,
+ TikaConfig tikaConfig) throws SQLException {
+ int recordsPerPulse = 10;
+ String sql = "update emits set status=" +
+ AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal()+
+ ", worker_id="+workerId+
+ " where emit_id in " +
+ " (select emit_id from emits "+//where worker_id = " +
workerId +
+ " and status="+
AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
+ " order by time_stamp asc limit "+recordsPerPulse+" for
update)";
+ PreparedStatement markForSelecting = connection.prepareStatement(sql);
+ sql = "select emit_id, uncompressed_size, bytes from emits where
status=" +
+ AsyncWorkerProcess.EMIT_STATUS_CODES.EMITTING.ordinal() +
+ " and worker_id=" + workerId +
+ " order by time_stamp asc";
+ PreparedStatement selectForProcessing =
connection.prepareStatement(sql);
+ sql = "delete from emits where emit_id=?";
+ PreparedStatement deleteFromEmits = connection.prepareStatement(sql);
+
+ }
+
+ private AsyncData deserialize(byte[] compressed, int decompressedLength)
+ throws IOException {
+ byte[] restored = new byte[decompressedLength];
+ int compressedLength2 = decompressor.decompress(compressed, 0,
restored,
+ 0, decompressedLength);
+
+ return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+ }
+
+ private static class EmitDataCache {
+ private final EmitterManager emitterManager;
+ private final long maxBytes;
+
+ long estimatedSize = 0;
+ int size = 0;
+ Map<String, List<AsyncData>> map = new HashMap<>();
+
+ public EmitDataCache(EmitterManager emitterManager,
+ long maxBytes) {
+ this.emitterManager = emitterManager;
+ this.maxBytes = maxBytes;
+ }
+
+ void updateEstimatedSize(long newBytes) {
+ estimatedSize += newBytes;
+ }
+
+ void add(AsyncData data) {
+
+ size++;
+ long sz =
AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(),
data.getMetadataList());
+ if (estimatedSize + sz > maxBytes) {
+ LOG.debug("estimated size ({}) > maxBytes({}), going to
emitAll",
+ (estimatedSize+sz), maxBytes);
+ emitAll();
+ }
+ List<AsyncData> cached =
map.get(data.getEmitKey().getEmitterName());
+ if (cached == null) {
+ cached = new ArrayList<>();
+ map.put(data.getEmitKey().getEmitterName(), cached);
+ }
+ updateEstimatedSize(sz);
+ cached.add(data);
+ }
+
+ private void emitAll() {
+ int emitted = 0;
+ LOG.debug("about to emit {}", size);
+ for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
+ Emitter emitter = emitterManager.getEmitter(e.getKey());
+ tryToEmit(emitter, e.getValue());
+ emitted += e.getValue().size();
+ }
+ LOG.debug("emitted: {}", emitted);
+ estimatedSize = 0;
+ size = 0;
+ map.clear();
+ //lastEmitted = Instant.now();
+ }
+
+ private long tryToEmit(Emitter emitter, List<AsyncData>
cachedEmitData) {
+
+ try {
+ emitter.emit(cachedEmitData);
+ } catch (IOException | TikaEmitterException e) {
+ LOG.warn("emitter class ({}): {}", emitter.getClass(),
+ ExceptionUtils.getStackTrace(e));
+ }
+ return 1;
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index cfe7455..e7dc0c1 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -1,109 +1,142 @@
package org.apache.tika.pipes.async;
import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-public class AsyncProcessor implements Closeable, Callable<Integer> {
+public class AsyncProcessor implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncProcessor.class);
-
+ protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
+ protected static String TIKA_ASYNC_CONFIG_FILE_KEY =
"TIKA_ASYNC_CONFIG_FILE_KEY";
private final Path tikaConfigPath;
private AsyncConfig asyncConfig;
- private final ArrayBlockingQueue<FetchEmitTuple> queue;//tmp directory
used if no jdbc string is configured
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final Connection connection;
+ private int finishedThreads = 0;
+ private final int totalThreads;
+ private ExecutorService executorService;
+ private ExecutorCompletionService<Integer> executorCompletionService;
+
+ public static AsyncProcessor build(Path tikaConfigPath) throws
AsyncRuntimeException {
+ try {
+ AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+
+ processor.init();
+ return processor;
+ } catch (SQLException|IOException e) {
+ throw new AsyncRuntimeException(e);
+ }
+ }
- public AsyncProcessor (Path tikaConfigPath) {
+ private AsyncProcessor(Path tikaConfigPath) throws SQLException,
IOException {
this.tikaConfigPath = tikaConfigPath;
- this.queue = new
ArrayBlockingQueue<FetchEmitTuple>(asyncConfig.getQueueSize());
-
+ this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+ this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
+ this.connection =
DriverManager.getConnection(asyncConfig.getJdbcString());
+ this.totalThreads = asyncConfig.getMaxConsumers() + 2;
}
- public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples,
long offerMs) {
+ public synchronized boolean offer (
+ List<FetchEmitTuple> fetchEmitTuples, long offerMs) throws
+ AsyncRuntimeException, InterruptedException {
if (queue == null) {
throw new IllegalStateException("queue hasn't been initialized
yet.");
}
long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis()-start;
+ long elapsed = System.currentTimeMillis() - start;
while (elapsed < offerMs) {
+ checkActive();
if (queue.remainingCapacity() > fetchEmitTuples.size()) {
- queue.addAll(fetchEmitTuples);
- return true;
+ try {
+ queue.addAll(fetchEmitTuples);
+ return true;
+ } catch (IllegalStateException e) {
+ //swallow
+ }
}
+ Thread.sleep(100);
elapsed = System.currentTimeMillis() - start;
}
return false;
}
- public synchronized boolean offer(FetchEmitTuple t, long offerMs) throws
InterruptedException {
+ public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+ throws AsyncRuntimeException, InterruptedException {
+ checkActive();
return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
}
+ /**
+ * This polls the executorcompletionservice to check for execution
exceptions
+ * and to make sure that some threads are still active.
+ *
+ * @return
+ * @throws AsyncRuntimeException
+ * @throws InterruptedException
+ */
+ public synchronized boolean checkActive()
+ throws AsyncRuntimeException, InterruptedException {
+ Future<Integer> future = executorCompletionService.poll();
+ if (future != null) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ throw new AsyncRuntimeException(e);
+ }
+ finishedThreads++;
+ }
+ if (finishedThreads == totalThreads) {
+ return false;
+ }
+ return true;
+ }
- @Override
- public Integer call() throws Exception {
- this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+ private void init() throws SQLException {
setupTables();
- ExecutorService executorService = Executors.newFixedThreadPool(
- asyncConfig.getMaxConsumers() + 2);
- ExecutorCompletionService<Integer> executorCompletionService =
+ executorService = Executors.newFixedThreadPool(
+ totalThreads);
+ executorCompletionService =
new ExecutorCompletionService<>(executorService);
- try (Connection connection =
DriverManager.getConnection(asyncConfig.getJdbcString())) {
- AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue,
connection);
-
- executorCompletionService.submit(enqueuer);
- executorCompletionService.submit(new AssignmentManager(connection,
enqueuer));
+ AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
- for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
- executorCompletionService.submit(new AsyncWorker(connection,
- asyncConfig.getJdbcString(), i, tikaConfigPath));
- }
- int completed = 0;
- while (completed < asyncConfig.getMaxConsumers()+2) {
- Future<Integer> future = executorCompletionService.take();
- if (future != null) {
- int val = future.get();
- completed++;
- LOG.debug("finished " + val);
- }
- }
- } finally {
- executorService.shutdownNow();
+ executorCompletionService.submit(enqueuer);
+ executorCompletionService.submit(new AssignmentManager(connection,
enqueuer));
+ //executorCompletionService.submit(new )
+ for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
+ executorCompletionService.submit(new AsyncWorker(connection,
+ asyncConfig.getJdbcString(), i, tikaConfigPath));
}
- return 1;
}
private void setupTables() throws SQLException {
- Connection connection =
DriverManager.getConnection(asyncConfig.getJdbcString());
String sql = "create table parse_queue " +
"(id bigint auto_increment primary key," +
@@ -112,31 +145,81 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
"retry smallint," + //short
"time_stamp timestamp," +
"json varchar(64000))";
- connection.createStatement().execute(sql);
+ try (Statement st = connection.createStatement()) {
+ st.execute(sql);
+ }
//no clear benefit to creating an index on timestamp
// sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status
(time_stamp)";
- sql = "create table workers (worker_id int primary key)";
- connection.createStatement().execute(sql);
-
- sql = "create table workers_shutdown (worker_id int primary key)";
- connection.createStatement().execute(sql);
+ sql = "create table workers (worker_id int primary key, status
tinyint)";
+ try (Statement st = connection.createStatement()) {
+ st.execute(sql);
+ }
sql = "create table error_log (task_id bigint, " +
"fetch_key varchar(10000)," +
"time_stamp timestamp," +
"retry integer," +
"error_code tinyint)";
- connection.createStatement().execute(sql);
+ try (Statement st = connection.createStatement()) {
+ st.execute(sql);
+ }
+ sql = "create table emits (" +
+ "emit_id bigint auto_increment primary key, " +
+ "status tinyint, " +
+ "worker_id integer, " +
+ "time_stamp timestamp, " +
+ "uncompressed_size bigint, " +
+ "bytes blob)";
+ try (Statement st = connection.createStatement()) {
+ st.execute(sql);
+ }
}
+ public void shutdownNow() throws IOException, AsyncRuntimeException {
+ try {
+ executorService.shutdownNow();
+ } finally {
+ //close down processes and db
+ if (asyncConfig.getTempDBDir() != null) {
+ FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+ }
+ }
+ }
+
+ /**
+ * This is a blocking close. If you need to shutdown immediately,
+ * try {@link #shutdownNow()}.
+ * @throws IOException
+ */
@Override
public void close() throws IOException {
- //close down processes and db
-
-
- if (asyncConfig.getTempDBDir() != null) {
- FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+ try {
+ for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
+ try {
+ //blocking
+ queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+ } catch (InterruptedException e) {
+ //swallow
+ }
+ }
+ long start = System.currentTimeMillis();
+ long elapsed = System.currentTimeMillis() - start;
+ try {
+ boolean isActive = checkActive();
+ while (isActive) {
+ isActive = checkActive();
+ elapsed = System.currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ } finally {
+ executorService.shutdownNow();
+ //close down processes and db
+ if (asyncConfig.getTempDBDir() != null) {
+ FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
+ }
}
}
@@ -166,7 +249,7 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
List<Integer> workers = new ArrayList<>();
while (true) {
FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
- LOG.debug("enqueing to db "+t);
+ LOG.debug("enqueing to db " + t);
if (t == null) {
//log.trace?
} else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
@@ -179,7 +262,7 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
while (workers.size() == 0 && elapsed < 600000) {
workers = getActiveWorkers(connection);
Thread.sleep(100);
- elapsed = System.currentTimeMillis()-start;
+ elapsed = System.currentTimeMillis() - start;
}
insert(t, workers);
}
@@ -189,11 +272,12 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
boolean isComplete() {
return isComplete;
}
+
private void insert(FetchEmitTuple t, List<Integer> workers) throws
IOException, SQLException {
int workerId = workers.size() == 1 ? workers.get(0) :
workers.get(random.nextInt(workers.size()));
insert.clearParameters();
- insert.setByte(1, (byte)
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+ insert.setByte(1, (byte)
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
insert.setInt(2, workerId);
insert.setShort(3, (short) 0);
insert.setString(4, JsonFetchEmitTuple.toJson(t));
@@ -210,7 +294,7 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
private final PreparedStatement allocateNonworkersToWorkers;
private final PreparedStatement reallocate;
private final PreparedStatement countAvailableTasks;
- private final PreparedStatement insertWorkersShutdown;
+ private final PreparedStatement shutdownWorker;
private final Random random = new Random();
@@ -244,11 +328,13 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
reallocate = connection.prepareStatement(sql);
sql = "select count(1) from parse_queue where status="
- + AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal();
+ + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
countAvailableTasks = connection.prepareStatement(sql);
- sql = "insert into workers_shutdown (worker_id) values (?)";
- insertWorkersShutdown = connection.prepareStatement(sql);
+ sql = "update workers set status="+
+
AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
+ " where worker_id = ?";
+ shutdownWorker = connection.prepareStatement(sql);
}
@Override
@@ -262,20 +348,20 @@ public class AsyncProcessor implements Closeable,
Callable<Integer> {
notifyWorkers();
return 1;
}
- Thread.sleep(100);
+ Thread.sleep(200);
}
}
private void notifyWorkers() throws SQLException {
for (int workerId : getActiveWorkers(connection)) {
- insertWorkersShutdown.clearParameters();
- insertWorkersShutdown.setInt(1, workerId);
- insertWorkersShutdown.execute();
+ shutdownWorker.clearParameters();
+ shutdownWorker.setInt(1, workerId);
+ shutdownWorker.execute();
}
}
private boolean isComplete() throws SQLException {
- if (! enqueuer.isComplete) {
+ if (!enqueuer.isComplete) {
return false;
}
try (ResultSet rs = countAvailableTasks.executeQuery()) {
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 62%
copy from
tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
copy to
tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
index 5f83db7..875cc90 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
@@ -14,27 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.emitter;
+package org.apache.tika.pipes.async;
-import org.apache.tika.metadata.Metadata;
-
-import java.io.IOException;
-import java.util.List;
-
-public class EmptyEmitter implements Emitter {
-
- @Override
- public String getName() {
- return "empty";
- }
-
- @Override
- public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
-
- }
-
- @Override
- public void emit(List<EmitData> emitData) throws IOException,
TikaEmitterException {
+/**
+ * Fatal exception that means that something went seriously wrong.
+ */
+public class AsyncRuntimeException extends RuntimeException {
+ public AsyncRuntimeException(Throwable t) {
+ super(t);
}
}
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index 2770abb..eaac09d 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -8,7 +8,7 @@ public class AsyncTask extends FetchEmitTuple {
public static final AsyncTask SHUTDOWN_SEMAPHORE
= new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null,
null));
- private final long taskId;
+ private long taskId;
private final short retry;
public AsyncTask(long taskId, short retry,
@@ -26,6 +26,9 @@ public class AsyncTask extends FetchEmitTuple {
return retry;
}
+ public void setTaskId(long taskId) {
+ this.taskId = taskId;
+ }
@Override
public String toString() {
return "AsyncTask{" +
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index 4f51e7a..ff4e5d4 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -10,7 +10,6 @@ import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Path;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -19,6 +18,9 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import static
org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+
/**
* This controls monitoring of the AsyncWorkerProcess
* and updates to the db on crashes etc.
@@ -32,7 +34,8 @@ public class AsyncWorker implements Callable<Integer> {
private final int workerId;
private final Path tikaConfigPath;
private final Connection connection;
- private final PreparedStatement delete;
+ private final PreparedStatement finished;
+ private final PreparedStatement restarting;
private final PreparedStatement selectActiveTasks;
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
@@ -44,13 +47,19 @@ public class AsyncWorker implements Callable<Integer> {
this.workerId = workerId;
this.tikaConfigPath = tikaConfigPath;
this.connection = connection;
- String sql = "delete from workers where worker_id = (" + workerId +
")";
- delete = connection.prepareStatement(sql);
-
+ String sql = "update workers set status="+
+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+ " where worker_id = (" + workerId + ")";
+ finished = connection.prepareStatement(sql);
+
+ sql = "update workers set status="+
+ AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
+ " where worker_id = (" + workerId + ")";
+ restarting = connection.prepareStatement(sql);
//this checks if the process was able to reset the status
sql = "select id, retry, json from parse_queue where worker_id="
+ workerId +
- " and status=" +
AsyncWorkerProcess.STATUS_CODES.IN_PROCESS.ordinal();
+ " and status=" +
AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
selectActiveTasks = connection.prepareStatement(sql);
//if not, this is called to insert into the error log
@@ -66,7 +75,7 @@ public class AsyncWorker implements Callable<Integer> {
try {
p = start();
int restarts = 0;
- while (restarts++ < 2) {
+ while (true) {
boolean finished = p.waitFor(60, TimeUnit.SECONDS);
if (finished) {
int exitValue = p.exitValue();
@@ -74,7 +83,7 @@ public class AsyncWorker implements Callable<Integer> {
LOG.info("child process finished with exitValue=0");
return 1;
}
- reportCrash(exitValue);
+ reportCrash(++restarts, exitValue);
p = start();
}
}
@@ -82,9 +91,8 @@ public class AsyncWorker implements Callable<Integer> {
if (p != null) {
p.destroyForcibly();
}
- delete.execute();
+ finished.execute();
}
- return -1 * workerId;
}
private Process start() throws IOException {
@@ -92,17 +100,20 @@ public class AsyncWorker implements Callable<Integer> {
"java", "-Djava.awt.headless=true",
"-cp", System.getProperty("java.class.path"),
"org.apache.tika.pipes.async.AsyncWorkerProcess",
- connectionString, Integer.toString(workerId),
-
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString())
+ Integer.toString(workerId)
};
ProcessBuilder pb = new ProcessBuilder(args);
+ pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
+ pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
+ tikaConfigPath.toAbsolutePath().toString());
pb.inheritIO();
return pb.start();
}
- private void reportCrash(int exitValue) throws SQLException, IOException {
+
+ private void reportCrash(int numRestarts, int exitValue) throws
SQLException, IOException {
LOG.warn("worker id={} terminated, exitValue={}",
workerId, exitValue);
- delete.execute();
+ restarting.execute();
List<AsyncTask> activeTasks = new ArrayList<>();
try (ResultSet rs = selectActiveTasks.executeQuery()) {
long taskId = rs.getLong(1);
@@ -120,7 +131,7 @@ public class AsyncWorker implements Callable<Integer> {
}
for (AsyncTask t : activeTasks) {
- reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN,
+ reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
insertErrorLog, resetStatus, LOG);
}
@@ -142,7 +153,7 @@ public class AsyncWorker implements Callable<Integer> {
try {
resetStatus.clearParameters();
- resetStatus.setByte(1, (byte)
AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+ resetStatus.setByte(1, (byte)
AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
resetStatus.setShort(2, (short)(task.getRetry()+1));
resetStatus.setLong(3, task.getTaskId());
resetStatus.execute();
diff --git
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 8273c5c..d01e9a0 100644
---
a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++
b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -1,10 +1,14 @@
package org.apache.tika.pipes.async;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.jpountz.lz4.LZ4Factory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
@@ -18,10 +22,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -44,19 +51,37 @@ import static
org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
public class AsyncWorkerProcess {
- enum STATUS_CODES {
+ enum TASK_STATUS_CODES {
AVAILABLE,
SELECTED,
IN_PROCESS,
}
+ public enum WORKER_STATUS_CODES {
+ ACTIVE,
+ RESTARTING,
+ HIBERNATING,
+ SHOULD_SHUTDOWN,
+ SHUTDOWN
+ }
+
+ public enum EMIT_STATUS_CODES {
+ READY,
+ EMITTING,
+ EMITTED
+ }
enum ERROR_CODES {
TIMEOUT,
SECURITY_EXCEPTION,
OTHER_EXCEPTION,
OOM,
OTHER_ERROR,
- UNKNOWN
+ UNKNOWN_PARSE,
+ EMIT_SERIALIZATION,
+ EMIT_SQL_INSERT_EXCEPTION,
+ EMIT_SQL_SELECT_EXCEPTION,
+ EMIT_DESERIALIZATION,
+ EMIT_EXCEPTION
}
private static final Logger LOG =
LoggerFactory.getLogger(AsyncWorkerProcess.class);
@@ -64,14 +89,13 @@ public class AsyncWorkerProcess {
//make these all configurable
private static final long SHUTDOWN_AFTER_MS = 120000;
private static long PULSE_MS = 1000;
- private long emitWithinMs = 1000;
- private long emitMaxBytes = 10_000_000;
private long parseTimeoutMs = 60000;
public static void main(String[] args) throws Exception {
- String db = args[0];
- int workerId = Integer.parseInt(args[1]);
- TikaConfig tikaConfig = new TikaConfig(Paths.get(args[2]));
+ Path tikaConfigPath =
Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
+ String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
+ TikaConfig tikaConfig = new TikaConfig(tikaConfigPath);
+ int workerId = Integer.parseInt(args[0]);
LOG.debug("trying to get connection {} >{}<", workerId, db);
try (Connection connection = DriverManager.getConnection(db)) {
AsyncWorkerProcess asyncWorker = new AsyncWorkerProcess();
@@ -83,18 +107,12 @@ public class AsyncWorkerProcess {
private void execute(Connection connection,
int workerId, TikaConfig tikaConfig) throws
SQLException {
- AsyncEmitHook asyncEmitHook = new AsyncPipesEmitHook(connection);
- AsyncEmitter asyncEmitter = new
AsyncEmitter(tikaConfig.getEmitterManager(),
- asyncEmitHook,
- emitWithinMs, emitMaxBytes);
-
ExecutorService service = Executors.newFixedThreadPool(3);
ExecutorCompletionService<Integer> executorCompletionService =
new ExecutorCompletionService<>(service);
- executorCompletionService.submit(new Worker(connection, workerId,
asyncEmitter,
+ executorCompletionService.submit(new Worker(connection, workerId,
tikaConfig, parseTimeoutMs));
- executorCompletionService.submit(asyncEmitter);
executorCompletionService.submit(new ForkWatcher(System.in));
int completed = 0;
@@ -113,7 +131,6 @@ public class AsyncWorkerProcess {
LOG.error("worker " + workerId + " had a mainloop exception", e);
} finally {
service.shutdownNow();
- asyncEmitter.emitAll();
}
return;
}
@@ -127,29 +144,30 @@ public class AsyncWorkerProcess {
private final PreparedStatement markForProcessing;
private final PreparedStatement checkForShutdown;
+
TaskQueue(Connection connection, int workerId) throws SQLException {
this.connection = connection;
this.workerId = workerId;
-
String sql = "update parse_queue set status=" +
- STATUS_CODES.SELECTED.ordinal()+
+ TASK_STATUS_CODES.SELECTED.ordinal()+
" where id = " +
" (select id from parse_queue where worker_id = " +
workerId +
- " and status="+STATUS_CODES.AVAILABLE.ordinal()+
+ " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
" order by time_stamp asc limit 1 for update)";
markForSelecting = connection.prepareStatement(sql);
sql = "select id, retry, json from parse_queue where status=" +
- STATUS_CODES.SELECTED.ordinal() +
+ TASK_STATUS_CODES.SELECTED.ordinal() +
" and " +
" worker_id=" + workerId +
" order by time_stamp asc limit 1";
selectForProcessing = connection.prepareStatement(sql);
sql = "update parse_queue set status="+
- STATUS_CODES.IN_PROCESS.ordinal()+
+ TASK_STATUS_CODES.IN_PROCESS.ordinal()+
" where id=?";
markForProcessing = connection.prepareStatement(sql);
- sql = "select count(1) from workers_shutdown where worker_id=" +
workerId;
+ sql = "select count(1) from workers where worker_id=" + workerId +
+ " and status="+WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
checkForShutdown = connection.prepareStatement(sql);
}
@@ -157,14 +175,12 @@ public class AsyncWorkerProcess {
long start = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - start;
while (elapsed < pollMs) {
-
if (shouldShutdown()) {
return SHUTDOWN_SEMAPHORE;
}
-
int i = markForSelecting.executeUpdate();
if (i == 0) {
- //debugQueue();
+ debugQueue();
Thread.sleep(PULSE_MS);
} else {
long taskId = -1;
@@ -200,7 +216,6 @@ public class AsyncWorkerProcess {
for (int i = 1; i <= rs.getMetaData().getColumnCount();
i++) {
System.out.print(rs.getString(i)+ " ");
}
- System.out.println("");
}
}
}
@@ -221,7 +236,6 @@ public class AsyncWorkerProcess {
private final Connection connection;
private final int workerId;
- private final AsyncEmitter asyncEmitter;
private final RecursiveParserWrapper parser;
private final TikaConfig tikaConfig;
private final long parseTimeoutMs;
@@ -229,24 +243,27 @@ public class AsyncWorkerProcess {
private ExecutorCompletionService<AsyncData> executorCompletionService;
private final PreparedStatement insertErrorLog;
private final PreparedStatement resetStatus;
-
+ private final PreparedStatement insertEmitData;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ LZ4Factory factory = LZ4Factory.fastestInstance();
public Worker(Connection connection,
int workerId,
- AsyncEmitter asyncEmitter,
TikaConfig tikaConfig, long parseTimeoutMs) throws
SQLException {
this.connection = connection;
this.workerId = workerId;
- this.asyncEmitter = asyncEmitter;
this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
this.tikaConfig = tikaConfig;
this.executorService = Executors.newFixedThreadPool(1);
this.executorCompletionService = new
ExecutorCompletionService<>(executorService);
this.parseTimeoutMs = parseTimeoutMs;
- String sql = "insert into workers (worker_id) values (" + workerId
+ ")";
+ String sql = "insert into workers (worker_id, status) " +
+ "values (" + workerId + ", "+
+ WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
connection.createStatement().execute(sql);
insertErrorLog = prepareInsertErrorLog(connection);
resetStatus = prepareReset(connection);
+ insertEmitData = prepareInsertEmitData(connection);
}
@@ -275,12 +292,10 @@ public class AsyncWorkerProcess {
}
}
} catch (TimeoutException e) {
- e.printStackTrace();
LOG.warn(task.getFetchKey().getKey(), e);
reportAndReset(task, ERROR_CODES.TIMEOUT,
insertErrorLog, resetStatus, LOG);
} catch (SecurityException e) {
- e.printStackTrace();
LOG.warn(task.getFetchKey().getKey(), e);
reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
insertErrorLog, resetStatus, LOG);
@@ -290,17 +305,14 @@ public class AsyncWorkerProcess {
reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
insertErrorLog, resetStatus, LOG);
} catch (OutOfMemoryError e) {
- e.printStackTrace();
LOG.warn(task.getFetchKey().getKey(), e);
reportAndReset(task, ERROR_CODES.OOM,
insertErrorLog, resetStatus, LOG);
} catch (Error e) {
- e.printStackTrace();
LOG.warn(task.getFetchKey().getKey(), e);
reportAndReset(task, ERROR_CODES.OTHER_ERROR,
insertErrorLog, resetStatus, LOG);
} finally {
- asyncEmitter.emitAll();
executorService.shutdownNow();
return 1;
}
@@ -324,18 +336,37 @@ public class AsyncWorkerProcess {
}
boolean shouldEmit = checkForParseException(asyncData);
if (shouldEmit) {
- boolean offered = asyncEmitter.emit(asyncData,
600000);//parameterize this
- if (!offered) {
- //TODO: deal with this
- LOG.warn("Failed to add ({}) " +
- "to emit queue after 10 minutes.",
- task.getFetchKey().getKey());
+ try {
+ emit(asyncData);
+ } catch (JsonProcessingException e) {
+ recordBadEmit(task.getTaskId(),
+ task.getFetchKey().getKey(),
+ ERROR_CODES.EMIT_SERIALIZATION.ordinal());
+ } catch (SQLException e) {
+ recordBadEmit(task.getTaskId(),
+ task.getFetchKey().getKey(),
+
ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
}
}
}
}
}
+ private void recordBadEmit(long taskId, String key, int ordinal) {
+ //stub
+ }
+
+ private void emit(AsyncData asyncData) throws SQLException,
+ JsonProcessingException {
+ insertEmitData.clearParameters();
+ insertEmitData.setLong(1, asyncData.getAsyncTask().getTaskId());
+ byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
+ byte[] compressed = factory.fastCompressor().compress(bytes);
+ insertEmitData.setLong(2, bytes.length);
+ insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
+ insertEmitData.execute();
+ }
+
private void handleTimeout(long taskId, String key) throws
TimeoutException {
LOG.warn("timeout taskid:{} fetchKey:{}", taskId, key);
throw new TimeoutException(key);
@@ -371,6 +402,14 @@ public class AsyncWorkerProcess {
}
return shouldEmit;
}
+
+ static PreparedStatement prepareInsertEmitData(Connection connection)
throws SQLException {
+ return connection.prepareStatement(
+ "insert into emits (status, time_stamp, uncompressed_size,
bytes) " +
+ " values
("+AsyncWorkerProcess.EMIT_STATUS_CODES.READY.ordinal()+
+ ",CURRENT_TIMESTAMP(),?,?)"
+ );
+ }
}
private static class TaskProcessor implements Callable<AsyncData> {
@@ -404,7 +443,6 @@ public class AsyncWorkerProcess {
} catch (SecurityException e) {
throw e;
}
-
injectUserMetadata(userMetadata, metadataList);
EmitKey emitKey = task.getEmitKey();
if (StringUtils.isBlank(emitKey.getKey())) {
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
similarity index 93%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
rename to
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
index a84c13a..c653267 100644
---
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
+++
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
@@ -1,6 +1,5 @@
-package org.apache.tika.pipes.driver;
+package org.apache.tika.pipes.async;
-import org.apache.tika.pipes.async.AsyncCli;
import org.junit.Test;
public class AsyncCliTest {
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
new file mode 100644
index 0000000..920634e
--- /dev/null
+++
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -0,0 +1,88 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class AsyncProcessorTest {
+
+ private Path dbDir;
+ private Path dbFile;
+ private Connection connection;
+ private Path tikaConfigPath;
+
+ @Before
+ public void setUp() throws SQLException, IOException {
+ dbDir = Files.createTempDirectory("async-db");
+ dbFile = dbDir.resolve("emitted-db");
+ String jdbc =
"jdbc:h2:file:"+dbFile.toAbsolutePath().toString()+";AUTO_SERVER=TRUE";
+ String sql = "create table emitted (id int auto_increment primary key,
json varchar(20000))";
+
+ connection = DriverManager.getConnection(jdbc);
+ connection.createStatement().execute(sql);
+ tikaConfigPath = dbDir.resolve("tika-config.xml");
+ String xml = "" +
+ "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
+ "<properties>" +
+ " <emitters>"+
+ " <emitter
class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
+ " <params>\n" +
+ " <param name=\"name\" type=\"string\">mock</param>\n"+
+ " <param name=\"jdbc\"
type=\"string\">"+jdbc+"</param>\n"+
+ " </params>" +
+ " </emitter>" +
+ " </emitters>"+
+ " <fetchers>" +
+ " <fetcher
class=\"org.apache.tika.pipes.async.MockFetcher\">" +
+ " <param name=\"name\" type=\"string\">mock</param>\n"+
+ " </fetcher>" +
+ " </fetchers>"+
+ "</properties>";
+ Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @After
+ public void tearDown() throws SQLException, IOException {
+ connection.createStatement().execute("drop table emitted");
+ connection.close();
+ FileUtils.deleteDirectory(dbDir.toFile());
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+
+
+ AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
+ for (int i = 0 ; i < 100; i++) {
+ FetchEmitTuple t = new FetchEmitTuple(
+ new FetchKey("mock", "key-"+i),
+ new EmitKey("mock", "emit-"+i),
+ new Metadata()
+ );
+ processor.offer(t, 1000);
+ }
+ processor.close();
+ String sql = "select * from emitted";
+ try (Statement st = connection.createStatement();
+ ResultSet rs = st.executeQuery(sql)) {
+ while (rs.next()) {
+ System.out.println(rs.getInt(1) + " : "+rs.getString(2));
+ }
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
new file mode 100644
index 0000000..9f07dec
--- /dev/null
+++
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -0,0 +1,76 @@
+package org.apache.tika.pipes.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MockEmitter implements Initializable, Emitter {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private Connection connection;
+ private String jdbc;
+ private PreparedStatement insert;
+
+ @Field
+ public void setJdbc(String jdbc) {
+ this.jdbc = jdbc;
+ }
+
+ @Override
+ public String getName() {
+ return "mock";
+ }
+
+ @Override
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException, TikaEmitterException {
+ emit(Collections.singletonList(
+ new EmitData(
+ new EmitKey(getName(), emitKey), metadataList)));
+ }
+
+ @Override
+ public void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException {
+ for (EmitData d : emitData) {
+ String json = objectMapper.writeValueAsString(d);
+ try {
+ insert.clearParameters();
+ insert.setString(1, json);
+ insert.execute();
+ } catch (SQLException e) {
+ throw new TikaEmitterException("problem inserting", e);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(Map<String, Param> params) throws
TikaConfigException {
+ try {
+ connection = DriverManager.getConnection(jdbc);
+ String sql = "insert into emitted (json) values (?)";
+ insert = connection.prepareStatement(sql);
+ } catch (SQLException e) {
+ throw new TikaConfigException("problem w connection", e);
+ }
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
+
+ }
+}
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
new file mode 100644
index 0000000..8c99ecf
--- /dev/null
+++
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -0,0 +1,29 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class MockFetcher implements Fetcher {
+
+ private static byte[] BYTES = new String("<?xml version=\"1.0\"
encoding=\"UTF-8\" ?>"+
+ "<mock>"+
+ "<metadata action=\"add\" name=\"dc:creator\">Nikolai
Lobachevsky</metadata>"+
+ "<write element=\"p\">main_content</write>"+
+ "</mock>").getBytes(StandardCharsets.UTF_8);
+
+ @Override
+ public String getName() {
+ return "mock";
+ }
+
+ @Override
+ public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
+ return new ByteArrayInputStream(BYTES);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
similarity index 91%
rename from
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
rename to
tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
index 2fb38df..394fb16 100644
---
a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
+++
b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
@@ -1,4 +1,4 @@
-package org.apache.tika.pipes.driver;
+package org.apache.tika.pipes.async;
import org.apache.commons.io.FileUtils;
@@ -8,20 +8,11 @@ import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 3831f07..58b4ff4 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -92,12 +92,17 @@ public class PipeIntegrationTests {
int numConsumers = 1;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
- completionService.submit(it);
+ ArrayBlockingQueue<FetchEmitTuple> queue = new
ArrayBlockingQueue<>(1000);
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new FSFetcherEmitter(
queue, tikaConfig.getFetcherManager().getFetcher("s3"),
null));
}
+ for (FetchEmitTuple t : it) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ }
int finished = 0;
try {
while (finished++ < numConsumers+1) {
@@ -112,17 +117,22 @@ public class PipeIntegrationTests {
@Test
public void testS3ToS3() throws Exception {
TikaConfig tikaConfig = getConfig("tika-config-s3Tos3.xml");
- FetchIterator it = tikaConfig.getFetchIterator();
int numConsumers = 20;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
- ArrayBlockingQueue<FetchEmitTuple> queue = it.init(numConsumers);
- completionService.submit(it);
+ ArrayBlockingQueue<FetchEmitTuple> queue = new
ArrayBlockingQueue<>(1000);
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new S3FetcherEmitter(
queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
(S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
}
+ FetchIterator it = tikaConfig.getFetchIterator();
+ for (FetchEmitTuple t : it) {
+ queue.offer(t);
+ }
+ for (int i = 0; i < numConsumers; i++) {
+ queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ }
int finished = 0;
try {
while (finished++ < numConsumers+1) {
diff --git
a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index 67bbbde..2c19afb 100644
---
a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++
b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -40,12 +40,12 @@ public class JsonFetchEmitTupleTest {
m.add("m3", "v4");
FetchEmitTuple t = new FetchEmitTuple(
- new FetchKey("fetcher1", "fetchkey1"),
- new EmitKey("emitter1", "emitKey1"),
+ new FetchKey("my_fetcher", "fetchKey1"),
+ new EmitKey("my_emitter", "emitKey1"),
m);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
-
+ System.out.println(writer.toString());
Reader reader = new StringReader(writer.toString());
FetchEmitTuple deserialized = JsonFetchEmitTuple.fromJson(reader);
assertEquals(t, deserialized);
diff --git
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index 913465d..9594de4 100644
---
a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++
b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -264,6 +264,7 @@ public class TikaServerEmitterIntegrationTest extends
IntegrationTestBase {
private JsonNode testOne(String fileName, boolean shouldFileExist,
FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws Exception {
awaitServerStartup();
+ System.out.println(getJsonString(fileName, onParseException));
Response response = WebClient
.create(endPoint + "/emit")
.accept("application/json")