This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-3288 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 73b0287742c54f21d3a2a6fa442b058d363de68a Author: tballison <[email protected]> AuthorDate: Mon Feb 1 17:46:32 2021 -0500 TIKA-3288 -- WIP do not merge. --- .../apache/tika/pipes/emitter/AbstractEmitter.java | 33 ++++++ .../org/apache/tika/pipes/emitter/EmitData.java | 32 ++++++ .../org/apache/tika/pipes/emitter/EmitKey.java | 2 +- .../org/apache/tika/pipes/emitter/Emitter.java | 10 +- .../apache/tika/pipes/emitter/EmptyEmitter.java | 7 +- tika-pipes/tika-emitters/tika-emitter-solr/pom.xml | 5 + .../tika/pipes/emitter/solr/SolrEmitter.java | 92 +++++++++------ tika-pipes/tika-httpclient-commons/pom.xml | 5 - .../apache/tika/pipes/PipeIntegrationTests.java | 2 +- .../org/apache/tika/server/client/TikaClient.java | 2 +- .../org/apache/tika/server/core/ServerStatus.java | 2 +- .../tika/server/core/ServerStatusWatcher.java | 2 +- .../org/apache/tika/server/core/TikaServerCli.java | 5 + .../apache/tika/server/core/TikaServerProcess.java | 80 ++++++++++++-- .../tika/server/core/resource/AsyncEmitter.java | 123 +++++++++++++++++++++ .../tika/server/core/resource/AsyncRequest.java | 23 ++++ .../tika/server/core/resource/AsyncResource.java | 123 +++++++++++++++++++++ .../server/core/TikaServerIntegrationTest.java | 10 +- 18 files changed, 489 insertions(+), 69 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java index 1117537..a54b708 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java @@ -17,6 +17,10 @@ package org.apache.tika.pipes.emitter; import org.apache.tika.config.Field; +import org.apache.tika.metadata.Metadata; + +import java.io.IOException; +import java.util.List; public abstract class AbstractEmitter implements Emitter { @@ -31,4 +35,33 @@ public abstract class AbstractEmitter implements Emitter { public String getName() { return name; } + + /** + * The default behavior is to call {@link #emit(String, List)} on each item. + * Some implementations, e.g. Solr/ES/vespa, can benefit from subclassing this and + * emitting a bunch of docs at once. + * + * @param emitData + * @throws IOException + * @throws TikaEmitterException + */ + @Override + public void emit(List<EmitData> emitData) throws IOException, TikaEmitterException { + for (EmitData d : emitData) { + emit(d.getEmitKey().getKey(), d.getMetadataList()); + } + } + + public static long estimateSizeInBytes(String id, List<Metadata> metadataList) { + long sz = 36 + id.length() * 2; + for (Metadata m : metadataList) { + for (String n : m.names()) { + sz += 36 + n.length() * 2; + for (String v : m.getValues(n)) { + sz += 36 + v.length() * 2; + } + } + } + return sz; + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java new file mode 100644 index 0000000..0063c6a --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java @@ -0,0 +1,32 @@ +package org.apache.tika.pipes.emitter; + +import org.apache.tika.metadata.Metadata; + +import java.util.List; + +public class EmitData { + + private final EmitKey emitKey; + private final List<Metadata> metadataList; + + public EmitData(EmitKey emitKey, List<Metadata> metadataList) { + this.emitKey = emitKey; + this.metadataList = metadataList; + } + + public EmitKey getEmitKey() { + return emitKey; + } + + public List<Metadata> getMetadataList() { + return metadataList; + } + + @Override + public String toString() { + return "EmitData{" + + "emitKey=" + emitKey + + ", metadataList=" + metadataList + + '}'; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java index 47a8ee7..aa53dfd 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java @@ -30,7 +30,7 @@ public class EmitKey { return emitterName; } - public String getEmitKey() { + public String getKey() { return emitKey; } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java index 4f6bdae..dbea669 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java @@ -27,14 +27,8 @@ public interface Emitter { void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException; + void emit(List<EmitData> emitData) throws IOException, TikaEmitterException; + //TODO -- add this later for xhtml? //void emit(String txt, Metadata metadata) throws IOException, TikaException; - /* - TODO we can add this later? - void addBatch(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException; - - void executeBatch() throws IOException, TikaEmitterException; - - */ - } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java index 8c0ebda..5f83db7 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java @@ -16,13 +16,13 @@ */ package org.apache.tika.pipes.emitter; -import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import java.io.IOException; import java.util.List; public class EmptyEmitter implements Emitter { + @Override public String getName() { return "empty"; @@ -32,4 +32,9 @@ public class EmptyEmitter implements Emitter { public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException { } + + @Override + public void emit(List<EmitData> emitData) throws IOException, TikaEmitterException { + + } } diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml index 1edfff0..e36d1c8 100644 --- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml +++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml @@ -42,6 +42,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java index 5830b14..4c15889 100644 --- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java @@ -16,9 +16,8 @@ */ package org.apache.tika.pipes.emitter.solr; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import org.apache.http.client.HttpClient; import org.apache.tika.client.HttpClientFactory; import org.apache.tika.client.HttpClientUtil; @@ -28,7 +27,7 @@ import org.apache.tika.config.Initializable; import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.config.Param; import org.apache.tika.pipes.emitter.AbstractEmitter; -import org.apache.tika.pipes.emitter.Emitter; +import org.apache.tika.pipes.emitter.EmitData; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.emitter.TikaEmitterException; @@ -36,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.StringWriter; import java.util.List; import java.util.Map; import java.util.UUID; @@ -48,7 +48,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable { PARENT_CHILD, //anything else? } - private static final Gson GSON = new Gson(); private static final String ATTACHMENTS = "attachments"; private static final String UPDATE_PATH = "/update"; private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class); @@ -61,14 +60,23 @@ public class SolrEmitter extends AbstractEmitter implements Initializable { private HttpClientFactory httpClientFactory; private HttpClient httpClient; + public SolrEmitter() throws TikaConfigException { + httpClientFactory = new HttpClientFactory(); + } @Override public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException { + if (metadataList == null || metadataList.size() == 0) { LOG.warn("metadataList is null or empty"); return; } - String json = jsonify(emitKey, metadataList); + StringWriter writer = new StringWriter(); + JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer); + jsonGenerator.writeStartArray(); + jsonify(jsonGenerator, emitKey, metadataList); + jsonGenerator.writeEndArray(); + String json = writer.toString(); LOG.debug("emitting json:"+json); try { HttpClientUtil.postJson(httpClient, @@ -78,10 +86,35 @@ public class SolrEmitter extends AbstractEmitter implements Initializable { } } - private String jsonify(String emitKey, List<Metadata> metadataList) { + @Override + public void emit(List<EmitData> batch) throws IOException, + TikaEmitterException { + if (batch == null || batch.size() == 0) { + LOG.warn("batch is null or empty"); + return; + } + StringWriter writer = new StringWriter(); + JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer); + jsonGenerator.writeStartArray(); + for (EmitData d : batch) { + jsonify(jsonGenerator, d.getKey(), d.getMetadataList()); + } + jsonGenerator.writeEndArray(); + String json = writer.toString(); + LOG.debug("emitting json:"+json); + try { + HttpClientUtil.postJson(httpClient, + url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), json); + } catch (TikaClientException e) { + throw new TikaEmitterException("can't post", e); + } + } + + private void jsonify(JsonGenerator jsonGenerator, String emitKey, List<Metadata> metadataList) throws IOException { metadataList.get(0).set(idField, emitKey); - if (attachmentStrategy == AttachmentStrategy.SKIP) { - return toJsonString(jsonify(metadataList.get(0))); + if (attachmentStrategy == AttachmentStrategy.SKIP || + metadataList.size() == 1) { + jsonify(metadataList.get(0), jsonGenerator); } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) { //this only handles text for now, not xhtml StringBuilder sb = new StringBuilder(); @@ -93,53 +126,40 @@ public class SolrEmitter extends AbstractEmitter implements Initializable { } Metadata parent = metadataList.get(0); parent.set(getContentField(), sb.toString()); - return toJsonString(jsonify(parent)); + jsonify(parent, jsonGenerator); } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) { - if (metadataList.size() == 1) { - JsonObject obj = jsonify(metadataList.get(0)); - return toJsonString(obj); - } - JsonObject parent = jsonify(metadataList.get(0)); - JsonArray children = new JsonArray(); + jsonify(metadataList.get(0), jsonGenerator); + jsonGenerator.writeArrayFieldStart(ATTACHMENTS); + for (int i = 1; i < metadataList.size(); i++) { Metadata m = metadataList.get(i); m.set(idField, UUID.randomUUID().toString()); - children.add(jsonify(m)); + jsonify(m, jsonGenerator); } - parent.add(ATTACHMENTS, children); - return toJsonString(parent); + jsonGenerator.writeEndArray(); } else { throw new IllegalArgumentException("I don't yet support this attachment strategy: " + attachmentStrategy); } } - private String toJsonString(JsonObject obj) { - //wrap the document into an array - //so that Solr correctly interprets this as - //upload docs vs a command. - JsonArray docs = new JsonArray(); - docs.add(obj); - return GSON.toJson(docs); - } - private JsonObject jsonify(Metadata metadata) { - JsonObject obj = new JsonObject(); + private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) throws IOException { + jsonGenerator.writeStartObject(); for (String n : metadata.names()) { + String[] vals = metadata.getValues(n); if (vals.length == 0) { continue; } else if (vals.length == 1) { - obj.addProperty(n, vals[0]); + jsonGenerator.writeStringField(n, vals[0]); } else if (vals.length > 1) { - JsonArray valArr = new JsonArray(); - for (int i = 0; i < vals.length; i++) { - valArr.add(vals[i]); - } - obj.add(n, valArr); + jsonGenerator.writeArrayFieldStart(n); + jsonGenerator.writeArray(vals, 0, vals.length); + jsonGenerator.writeEndArray(); } } - return obj; + jsonGenerator.writeEndObject(); } /** diff --git a/tika-pipes/tika-httpclient-commons/pom.xml b/tika-pipes/tika-httpclient-commons/pom.xml index ef7797a..c33d3fb 100644 --- a/tika-pipes/tika-httpclient-commons/pom.xml +++ b/tika-pipes/tika-httpclient-commons/pom.xml @@ -42,11 +42,6 @@ <artifactId>httpclient</artifactId> <version>${httpcomponents.version}</version> </dependency> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>${gson.version}</version> - </dependency> </dependencies> </project> \ No newline at end of file diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java index 7df2bb4..3831f07 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java @@ -218,7 +218,7 @@ public class PipeIntegrationTests { userMetadata.set("project", "my-project"); try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) { - emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata); + emitter.emit(t.getEmitKey().getKey(), is, userMetadata); } } } diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java index 6e0e26e..73ff3f4 100644 --- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java +++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java @@ -70,7 +70,7 @@ public class TikaClient { root.add("fetcher", new JsonPrimitive(fetchEmit.getFetchKey().getFetcherName())); root.add("fetchKey", new JsonPrimitive(fetchEmit.getFetchKey().getKey())); root.add("emitter", new JsonPrimitive(fetchEmit.getEmitKey().getEmitterName())); - root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getEmitKey())); + root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getKey())); if (metadata.size() > 0) { JsonObject m = new JsonObject(); for (String n : metadata.names()) { diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java index a15f7a5..92068f5 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java @@ -41,7 +41,7 @@ public class ServerStatus { public enum STATUS { INITIALIZING(0), OPERATING(1), - HIT_MAX(2), + HIT_MAX_FILES(2), TIMEOUT(3), ERROR(4), PARENT_REQUESTED_SHUTDOWN(5), diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java index a2ad313..7ac0ffd 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java @@ -136,7 +136,7 @@ public class ServerStatusWatcher implements Runnable { } long filesProcessed = serverStatus.getFilesProcessed(); if (filesProcessed >= maxFiles) { - serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX); + serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX_FILES); } } diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java index dbcbeb1..4ca9bb4 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java @@ -140,7 +140,10 @@ public class TikaServerCli { try { mainLoop(line, newArgs); } catch (InterruptedException e) { + e.printStackTrace(); //swallow + } catch (Exception e) { + e.printStackTrace(); } } } @@ -173,11 +176,13 @@ public class TikaServerCli { WatchDogResult result = future.get(); LOG.debug("main loop future: ({}); about to restart", result); if (maxRestarts < 0 || result.getNumRestarts() < maxRestarts) { + System.err.println("starting up again"); executorCompletionService.submit( new TikaServerWatchDog(args, result.getPort(), result.getId(), result.getNumRestarts(), serverTimeoutConfig)); } else { + System.err.println("finished!"); LOG.warn("id {} with port {} has exceeded maxRestarts {}. Shutting down and not restarting.", result.getId(), result.getPort(), maxRestarts); finished++; diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java index 0a8fd91..94f2190 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java @@ -23,6 +23,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.cxf.binding.BindingFactoryManager; +import org.apache.cxf.endpoint.Server; import org.apache.cxf.jaxrs.JAXRSBindingFactory; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; @@ -36,6 +37,8 @@ import org.apache.tika.config.TikaConfig; import org.apache.tika.parser.DigestingParser; import org.apache.tika.parser.digestutils.BouncyCastleDigester; import org.apache.tika.parser.digestutils.CommonsDigester; +import org.apache.tika.server.core.resource.AsyncEmitter; +import org.apache.tika.server.core.resource.AsyncResource; import org.apache.tika.server.core.resource.DetectorResource; import org.apache.tika.server.core.resource.EmitterResource; import org.apache.tika.server.core.resource.LanguageResource; @@ -72,13 +75,18 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class TikaServerProcess { //used in fork mode -- restart after processing this many files private static final long DEFAULT_MAX_FILES = 100000; - + private static String ENABLE_UNSECURE_FEATURES = "enableUnsecureFeatures"; private static final int DEFAULT_DIGEST_MARK_LIMIT = 20 * 1024 * 1024; public static final Set<String> LOG_LEVELS = new HashSet<>(Arrays.asList("debug", "info")); @@ -132,7 +140,7 @@ public class TikaServerProcess { CommandLineParser cliParser = new DefaultParser(); CommandLine line = cliParser.parse(options, args); - runServer(line, options); + mainLoop(line, options); } catch (Exception e) { e.printStackTrace(); LOG.error("Can't start: ", e); @@ -140,10 +148,35 @@ public class TikaServerProcess { } } + private static void mainLoop(CommandLine commandLine, Options options) throws Exception { + AsyncResource asyncResource = null; + ArrayBlockingQueue asyncQueue = null; + int numAsyncThreads = 10; + if (commandLine.hasOption("unsecureFeatures")) { + asyncResource = new AsyncResource(); + asyncQueue = asyncResource.getQueue(numAsyncThreads); + } + + ServerDetails serverDetails = initServer(commandLine, asyncResource); + ExecutorService executorService = Executors.newFixedThreadPool(numAsyncThreads+1); + ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService); + + executorCompletionService.submit(new ServerThread(serverDetails)); + if (asyncQueue != null) { + for (int i = 0; i < numAsyncThreads; i++) { + executorCompletionService.submit(new AsyncEmitter(asyncQueue)); + } + } + while (true) { + + } + } + //This starts the server in this process. This can //be either a direct call from -noFork or the process that is forked //in the 2.0 default mode. - private static void runServer(CommandLine line, Options options) throws Exception { + private static ServerDetails initServer(CommandLine line, + AsyncResource asyncResource) throws Exception { String host = null; @@ -224,12 +257,12 @@ public class TikaServerProcess { } InputStreamFactory inputStreamFactory = null; - if (line.hasOption("enableUnsecureFeatures")) { + if (line.hasOption(ENABLE_UNSECURE_FEATURES)) { inputStreamFactory = new FetcherStreamFactory(tika.getFetcherManager()); } else { inputStreamFactory = new DefaultInputStreamFactory(); } - logFetchersAndEmitters(line.hasOption("enableUnsecureFeatures"), tika); + logFetchersAndEmitters(line.hasOption(ENABLE_UNSECURE_FEATURES), tika); String serverId = line.hasOption("i") ? line.getOptionValue("i") : UUID.randomUUID().toString(); LOG.debug("SERVER ID:" + serverId); ServerStatus serverStatus; @@ -272,8 +305,9 @@ public class TikaServerProcess { rCoreProviders.add(new SingletonResourceProvider(new TikaDetectors())); rCoreProviders.add(new SingletonResourceProvider(new TikaParsers())); rCoreProviders.add(new SingletonResourceProvider(new TikaVersion())); - if (line.hasOption("enableUnsecureFeatures")) { + if (line.hasOption(ENABLE_UNSECURE_FEATURES)) { rCoreProviders.add(new SingletonResourceProvider(new EmitterResource())); + rCoreProviders.add(new SingletonResourceProvider(asyncResource)); } rCoreProviders.addAll(loadResourceServices()); if (line.hasOption("status")) { @@ -315,11 +349,11 @@ public class TikaServerProcess { JAXRSBindingFactory factory = new JAXRSBindingFactory(); factory.setBus(sf.getBus()); manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); - sf.create(); - LOG.info("Started Apache Tika server {} at {}", - serverId, - url); - + ServerDetails details = new ServerDetails(); + details.sf = sf; + details.url = url; + details.serverId = serverId; + return details; } private static void logFetchersAndEmitters(boolean enableUnsecureFeatures, TikaConfig tika) { @@ -421,4 +455,28 @@ public class TikaServerProcess { return serverTimeouts; } + private static class ServerThread implements Callable<Integer> { + private final ServerDetails serverDetails; + public ServerThread(ServerDetails serverDetails) { + this.serverDetails = serverDetails; + } + + @Override + public Integer call() throws Exception { + + Server server = serverDetails.sf.create(); + System.err.println("started : "+serverDetails.serverId); + LOG.info("Started Apache Tika server {} at {}", + serverDetails.serverId, + serverDetails.url); + System.err.println("returning : "+serverDetails.serverId); + return 2; + } + } + + private static class ServerDetails { + JAXRSServerFactoryBean sf; + String serverId; + String url; + } } diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java new file mode 100644 index 0000000..f473699 --- /dev/null +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java @@ -0,0 +1,123 @@ +package org.apache.tika.server.core.resource; + +import org.apache.cxf.jaxrs.impl.UriInfoImpl; +import org.apache.cxf.message.MessageImpl; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.emitter.AbstractEmitter; +import org.apache.tika.pipes.emitter.EmitData; +import org.apache.tika.pipes.emitter.Emitter; +import org.apache.tika.pipes.emitter.TikaEmitterException; +import org.apache.tika.pipes.fetchiterator.FetchEmitTuple; +import org.apache.tika.utils.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedHashMap; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Worker thread that takes ASyncRequests off the queue and + * processes them. + */ +public class AsyncEmitter implements Callable<Integer> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class); + + + private long maxCacheSizeBytes = 10_000_000; + + private final ArrayBlockingQueue<AsyncRequest> queue; + + public AsyncEmitter(ArrayBlockingQueue<AsyncRequest> queue) { + this.queue = queue; + } + + @Override + public Integer call() throws Exception { + while (true) { + AsyncRequest request = queue.poll(1, TimeUnit.MINUTES); + if (request != null) { + processTuple(request); + } else { + LOG.trace("Nothing on the async queue"); + } + } + } + + private void processTuple(AsyncRequest request) { + LOG.debug("Starting request id ({}) of size ({})", + request.getId(), request.getTuples().size()); + List<EmitData> cachedEmitData = new ArrayList<>(); + Emitter emitter = TikaResource.getConfig() + .getEmitterManager() + .getEmitter( + request.getTuples().get(0).getEmitKey().getEmitterName()); + long currSize = 0; + for (FetchEmitTuple t : request.getTuples()) { + EmitData emitData = processTuple(t); + long estimated = AbstractEmitter.estimateSizeInBytes( + emitData.getEmitKey().getKey(), emitData.getMetadataList()); + if (estimated + currSize > maxCacheSizeBytes) { + tryToEmit(emitter, cachedEmitData, request); + cachedEmitData.clear(); + } + cachedEmitData.add(emitData); + currSize += estimated; + } + tryToEmit(emitter, cachedEmitData, request); + cachedEmitData.clear(); + LOG.debug("Completed request id ({})", + request.getId(), request.getTuples().size()); + } + + private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData, + AsyncRequest request) { + try { + emitter.emit(cachedEmitData); + } catch (IOException|TikaEmitterException e) { + LOG.warn("async id ({}) emitter class ({}): {}", + request.getId(), emitter.getClass(), + ExceptionUtils.getStackTrace(e)); + } + } + + private EmitData processTuple(FetchEmitTuple t) { + Metadata userMetadata = t.getMetadata(); + Metadata metadata = new Metadata(); + String fetcherName = t.getFetchKey().getFetcherName(); + String fetchKey = t.getFetchKey().getKey(); + List<Metadata> metadataList = null; + try (InputStream stream = + TikaResource.getConfig().getFetcherManager() + .getFetcher(fetcherName).fetch(fetchKey, metadata)) { + + metadataList = RecursiveMetadataResource.parseMetadata( + stream, + metadata, + new MultivaluedHashMap<>(), + new UriInfoImpl(new MessageImpl()), "text"); + } catch (SecurityException e) { + throw e; + } catch (Exception e) { + LOG.warn(t.toString(), e); + } + injectUserMetadata(userMetadata, metadataList); + return new EmitData(t.getEmitKey(), metadataList); + } + + private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) { + for (String n : userMetadata.names()) { + //overwrite whatever was there + metadataList.get(0).set(n, null); + for (String val : userMetadata.getValues(n)) { + metadataList.get(0).add(n, val); + } + } + } +} diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java new file mode 100644 index 0000000..f46105c --- /dev/null +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java @@ -0,0 +1,23 @@ +package org.apache.tika.server.core.resource; + +import org.apache.tika.pipes.fetchiterator.FetchEmitTuple; + +import java.util.List; + +public class AsyncRequest { + private final String id; + private final List<FetchEmitTuple> tuples; + + public AsyncRequest(String id, List<FetchEmitTuple> tuples) { + this.id = id; + this.tuples = tuples; + } + + public String getId() { + return id; + } + + public List<FetchEmitTuple> getTuples() { + return tuples; + } +} diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java new file mode 100644 index 0000000..240933f --- /dev/null +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.server.core.resource; + +import org.apache.tika.config.TikaConfig; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.pipes.emitter.EmitKey; +import org.apache.tika.pipes.emitter.EmitterManager; +import org.apache.tika.pipes.fetcher.FetchKey; +import org.apache.tika.pipes.fetcher.FetcherManager; +import org.apache.tika.pipes.fetchiterator.FetchEmitTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.UriInfo; +import java.io.InputStream; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +@Path("/async") +public class AsyncResource { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncResource.class); + + private static final int DEFAULT_QUEUE_SIZE = 100; + private int queueSize = DEFAULT_QUEUE_SIZE; + private ArrayBlockingQueue<AsyncRequest> queue; + + public ArrayBlockingQueue<AsyncRequest> getQueue(int numThreads) { + this.queue = new ArrayBlockingQueue<>(queueSize+numThreads); + return queue; + } + /** + * The client posts a json request. At a minimum, this must be a + * json object that contains an emitter and a fetcherString key with + * the key to fetch the inputStream. Optionally, it may contain a metadata + * object that will be used to populate the metadata key for pass + * through of metadata from the client. + * <p> + * The extracted text content is stored with the key + * {@link TikaCoreProperties#TIKA_CONTENT} + * <p> + * Must specify a fetcherString and an emitter in the posted json. + * + * @param info uri info + * @return InputStream that can be deserialized as a list of {@link Metadata} objects + * @throws Exception + */ + @POST + @Produces("application/json") + public Map<String, String> post(InputStream is, + @Context HttpHeaders httpHeaders, + @Context UriInfo info + ) throws Exception { + + AsyncRequest request = deserializeASyncRequest(is); + FetcherManager fetcherManager = TikaConfig.getDefaultConfig().getFetcherManager(); + EmitterManager emitterManager = TikaConfig.getDefaultConfig().getEmitterManager(); + for (FetchEmitTuple t : request.getTuples()) { + if (! fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) { + return badFetcher(t.getFetchKey()); + } + if (! emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) { + return badEmitter(t.getEmitKey()); + } + } + + //parameterize + boolean offered = queue.offer(request, 60, TimeUnit.SECONDS); + if (! offered) { + return throttleResponse(); + } + return ok(request.getId(), request.getTuples().size()); + } + + private Map<String, String> ok(String id, int size) { + return null; + } + + private Map<String, String> throttleResponse() { + Map<String, String> map = new HashMap<>(); + return map; + } + + private Map<String, String> badEmitter(EmitKey emitKey) { + return null; + } + + private Map<String, String> badFetcher(FetchKey fetchKey) { + return null; + } + + private AsyncRequest deserializeASyncRequest(InputStream is) { + return new AsyncRequest("", Collections.EMPTY_LIST); + } + +} diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java index 8d25d83..16699ed 100644 --- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java +++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java @@ -64,7 +64,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase { public void run() { TikaServerCli.main( new String[]{ - "-maxFiles", "2000", + "-maxFiles", "100", "-p", INTEGRATION_TEST_PORT, "-tmpFilePrefix", "basic-" }); @@ -72,9 +72,12 @@ public class TikaServerIntegrationTest extends IntegrationTestBase { }; serverThread.start(); try { - testBaseline(); + for (int i = 0; i < 500; i++) { + System.out.println("base "+i); + testBaseline(); + } } finally { - serverThread.interrupt(); + //serverThread.interrupt(); } } @@ -563,6 +566,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase { .put(ClassLoader .getSystemResourceAsStream(TEST_HELLO_WORLD)); Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8); + System.out.println(response.getStatus()); List<Metadata> metadataList = JsonMetadataList.fromJson(reader); assertEquals(1, metadataList.size()); assertEquals("Nikolai Lobachevsky", metadataList.get(0).get("author"));
