This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch TIKA-4181-grpc in repository https://gitbox.apache.org/repos/asf/tika.git
commit 657067b86733205feb225543c4cc6d40c143425e Author: Nicholas DiPiazza <[email protected]> AuthorDate: Sun Feb 11 13:34:26 2024 -0600 TIKA-4181 - grpc server and client --- .../java/org/apache/tika/pipes/PipesConfig.java | 7 + .../org/apache/tika/pipes/PipesClientTest.java | 49 +++++ .../org/apache/tika/pipes/tika-sample-config.xml | 41 ++++ tika-pipes/pom.xml | 1 + tika-pipes/tika-grpc/README.md | 13 ++ tika-pipes/tika-grpc/pom.xml | 187 ++++++++++++++++++ .../org/apache/tika/pipes/grpc/TikaClient.java | 112 +++++++++++ .../org/apache/tika/pipes/grpc/TikaServer.java | 217 +++++++++++++++++++++ tika-pipes/tika-grpc/src/main/proto/tika.proto | 47 +++++ .../org/apache/tika/pipes/grpc/TikaServerTest.java | 59 ++++++ tika-pipes/tika-grpc/tika-config.xml | 35 ++++ 11 files changed, 768 insertions(+) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java index 06783d67c..b0e8649f9 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java @@ -25,6 +25,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tika.config.TikaConfig; import org.apache.tika.exception.TikaConfigException; public class PipesConfig extends PipesConfigBase { @@ -46,6 +47,12 @@ public class PipesConfig extends PipesConfigBase { return pipesConfig; } + public static PipesConfig load(InputStream tikaConfigInputStream) throws IOException, TikaConfigException { + PipesConfig pipesConfig = new PipesConfig(); + pipesConfig.configure("pipes", tikaConfigInputStream); + return pipesConfig; + } + private PipesConfig() { } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java new file mode 100644 index 000000000..46c475546 --- /dev/null +++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java @@ -0,0 +1,49 @@ +package org.apache.tika.pipes; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.xml.sax.SAXException; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.emitter.EmitKey; +import org.apache.tika.pipes.fetcher.FetchKey; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher; + +class PipesClientTest { + String fetcherName = "fs"; + String testPdfFile = "testOverlappingText.pdf"; + + private PipesClient pipesClient; + @BeforeEach + public void init() + throws TikaConfigException, IOException, ParserConfigurationException, SAXException { + Path tikaConfigPath = Paths.get("src", "test", "resources", "org", "apache", "tika", + "pipes", "tika-sample-config.xml"); + PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath); + pipesClient = new PipesClient(pipesConfig); + } + + @Test + void process() throws IOException, InterruptedException { + PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(testPdfFile, + new FetchKey(fetcherName, + testPdfFile), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); + Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList()); + Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size()); + Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0); + Assertions.assertEquals("testOverlappingText.pdf", metadata.get("resourceName")); + } +} \ No newline at end of file diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml new file mode 100644 index 000000000..c936852d9 --- /dev/null +++ b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<properties> + <pipes> + <params> + <numClients>2</numClients> + <forkedJvmArgs> + <arg>-Xmx1g</arg> + <arg>-XX:ParallelGCThreads=2</arg> + </forkedJvmArgs> + <timeoutMillis>60000</timeoutMillis> + <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit --> + </params> + </pipes> + <autoDetectParserConfig> + <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory"> + <skipContainerDocument>false</skipContainerDocument> + </digesterFactory> + </autoDetectParserConfig> + <fetchers> + <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher"> + <name>fs</name> + <basePath>src/test/resources/test-documents</basePath> + </fetcher> + </fetchers> +</properties> \ No newline at end of file diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml index 4ef27a191..61738fcd9 100644 --- a/tika-pipes/pom.xml +++ b/tika-pipes/pom.xml @@ -36,6 +36,7 @@ <module>tika-pipes-iterators</module> <module>tika-pipes-reporters</module> <module>tika-async-cli</module> + <module>tika-grpc</module> </modules> <dependencies> <dependency> diff --git a/tika-pipes/tika-grpc/README.md b/tika-pipes/tika-grpc/README.md new file mode 100644 index 000000000..7b0d4ccd6 --- /dev/null +++ b/tika-pipes/tika-grpc/README.md @@ -0,0 +1,13 @@ +# Tika Pipes GRPC Server + +The following is the Tika Pipes GRPC Server. + +This server will manage a pool of Tika Pipes clients. + +* Tika Pipes Fetcher CRUD operations + * Create + * Read + * Update + * Delete +* Fetch + Parse a given Fetch Item + diff --git a/tika-pipes/tika-grpc/pom.xml b/tika-pipes/tika-grpc/pom.xml new file mode 100644 index 000000000..121baff41 --- /dev/null +++ b/tika-pipes/tika-grpc/pom.xml @@ -0,0 +1,187 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>tika-grpc</artifactId> + <packaging>jar</packaging> + <!-- Feel free to delete the comment at the end of these lines. It is just + for safely updating the version in our release process. --> + <version>1.60.0</version><!-- CURRENT_GRPC_VERSION --> + <name>Apache Tika Pipes GRPC Server</name> + <url>https://tika.apache.org/</url> + + <parent> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes</artifactId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <grpc.version>1.60.0</grpc.version><!-- CURRENT_GRPC_VERSION --> + <protobuf.version>3.24.0</protobuf.version> + <protoc.version>3.24.0</protoc.version> + <!-- required for JDK 8 --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-bom</artifactId> + <version>${grpc.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-services</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.10.1</version> <!-- prevent downgrade via protobuf-java-util --> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>32.0.1-jre</version> <!-- prevent downgrade of version in protobuf-java-util --> + </dependency> + <dependency> + <groupId>com.google.j2objc</groupId> + <artifactId>j2objc-annotations</artifactId> + <version>2.8</version> <!-- prevent downgrade of version in guava --> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-pipes --> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-async-cli</artifactId> + <version>2.9.1</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-parsers-standard-package</artifactId> + <version>2.9.1</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-core</artifactId> + <version>2.9.1</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>annotations-api</artifactId> + <version>6.0.53</version> + <scope>provided</scope> <!-- not needed at runtime --> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.13.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>3.4.0</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.7.1</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <executions> + <execution> + <id>enforce</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireUpperBoundDeps/> + </rules> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.5.0</version> + <executions> + <execution> + <id>test</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>${basedir}target/generated-sources/protobuf/grpc-java</source> + <source>${basedir}target/generated-sources/protobuf/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java new file mode 100644 index 000000000..17efe060f --- /dev/null +++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java @@ -0,0 +1,112 @@ +/* + * Copyright 2015 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.pipes.grpc; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; + +import org.apache.tika.CreateFetcherReply; +import org.apache.tika.CreateFetcherRequest; +import org.apache.tika.FetchReply; +import org.apache.tika.FetchRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher; + +public class TikaClient { + private static final Logger logger = Logger.getLogger(TikaClient.class.getName()); + + private final TikaGrpc.TikaBlockingStub blockingStub; + + public TikaClient(Channel channel) { + // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to + // shut it down. + + // Passing Channels to code makes code easier to test and makes it easier to reuse Channels. + blockingStub = TikaGrpc.newBlockingStub(channel); + } + + public void createFetcher(CreateFetcherRequest createFileSystemFetcherRequest) { + CreateFetcherReply response; + try { + response = blockingStub.createFetcher(createFileSystemFetcherRequest); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Create fetcher: " + response.getMessage()); + } + + public void fetch(FetchRequest fetchRequest) { + FetchReply fetchReply; + try { + fetchReply = blockingStub.fetch(fetchRequest); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap()); + } + + public static void main(String[] args) throws Exception { + if (args.length != 1) { + System.err.println("Expects one command line argument for the base path to use for the crawl."); + System.exit(1); + return; + } + String crawlPath = args[0]; + String target = "localhost:50051"; + // Create a communication channel to the server, known as a Channel. Channels are thread-safe + // and reusable. It is common to create channels at the beginning of your application and reuse + // them until the application shuts down. + // + // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To + // use TLS, use TlsChannelCredentials instead. + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + try { + TikaClient client = new TikaClient(channel); + String fetcherId = "file-system-fetcher-" + UUID.randomUUID(); + + client.createFetcher(CreateFetcherRequest.newBuilder() + .setName(fetcherId) + .setFetcherClass(FileSystemFetcher.class.getName()) + .putParams("basePath", crawlPath) + .putParams("extractFileSystemMetadata", "true") + .build()); + + client.fetch(FetchRequest.newBuilder() + .setFetcherName(fetcherId) + .setFetchKey("000164.pdf") + .build()); + + + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java new file mode 100644 index 000000000..218f81d2c --- /dev/null +++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java @@ -0,0 +1,217 @@ +/* + * Copyright 2015 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.pipes.grpc; + +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.stub.StreamObserver; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.xml.sax.SAXException; + +import org.apache.tika.CreateFetcherReply; +import org.apache.tika.CreateFetcherRequest; +import org.apache.tika.FetchReply; +import org.apache.tika.FetchRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.config.Param; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.FetchEmitTuple; +import org.apache.tika.pipes.PipesClient; +import org.apache.tika.pipes.PipesConfig; +import org.apache.tika.pipes.PipesResult; +import org.apache.tika.pipes.emitter.EmitKey; +import org.apache.tika.pipes.fetcher.AbstractFetcher; +import org.apache.tika.pipes.fetcher.FetchKey; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher; + +/** + * Server that manages startup/shutdown of a server. + */ +public class TikaServer { + private static final Logger logger = Logger.getLogger(TikaServer.class.getName()); + private Server server; + + private static String tikaConfigPath; + + private void start() throws Exception { + /* The port on which the server should run */ + int port = 50051; + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new TikaServerImpl()).build().start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + TikaServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + })); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws Exception { + tikaConfigPath = args[0]; + final TikaServer server = new TikaServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class TikaServerImpl extends TikaGrpc.TikaImplBase { + Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>()); + PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml")); + PipesClient pipesClient = new PipesClient(pipesConfig); + + TikaServerImpl() throws TikaConfigException, IOException { + } + + private void updateTikaConfig() + throws ParserConfigurationException, IOException, SAXException, + TransformerException { + Document tikaConfigDoc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath); + Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0); + for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) { + fetchersElement.removeChild(fetchersElement.getChildNodes().item(i)); + } + for (Map.Entry<String, AbstractFetcher> fetcherEntry : fetchers.entrySet()) { + Element fetcher = tikaConfigDoc.createElement("fetcher"); + fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName()); + if (fetcherEntry.getValue() instanceof FileSystemFetcher) { + FileSystemFetcher fileSystemFetcher = (FileSystemFetcher) fetcherEntry.getValue(); + Element fetcherName = tikaConfigDoc.createElement("name"); + fetcherName.setTextContent(fileSystemFetcher.getName()); + fetcher.appendChild(fetcherName); + Element basePath = tikaConfigDoc.createElement("basePath"); + fetcher.appendChild(basePath); + basePath.setTextContent(fileSystemFetcher.getBasePath().toAbsolutePath().toString()); + } + fetchersElement.appendChild(fetcher); + } + DOMSource source = new DOMSource(tikaConfigDoc); + FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8); + StreamResult result = new StreamResult(writer); + + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + transformer.transform(source, result); + } + + @Override + public void createFetcher(CreateFetcherRequest request, + StreamObserver<CreateFetcherReply> responseObserver) { + CreateFetcherReply reply = + CreateFetcherReply.newBuilder().setMessage(request.getName()).build(); + if (FileSystemFetcher.class.getName().equals(request.getFetcherClass())) { + FileSystemFetcher fileSystemFetcher = new FileSystemFetcher(); + fileSystemFetcher.setName(request.getName()); + fileSystemFetcher.setBasePath(request.getParamsOrDefault("basePath", ".")); + fileSystemFetcher.setExtractFileSystemMetadata(Boolean.parseBoolean(request.getParamsOrDefault("extractFileSystemMetadata", "false"))); + Map<String, String> paramsMap = request.getParamsMap(); + Map<String, Param> tikaParamsMap = new HashMap<>(); + for (Map.Entry<String, String> entry : paramsMap.entrySet()) { + tikaParamsMap.put(entry.getKey(), + new Param<>(entry.getKey(), entry.getValue())); + } + try { + fileSystemFetcher.initialize(tikaParamsMap); + } catch (TikaConfigException e) { + throw new RuntimeException(e); + } + fetchers.put(request.getName(), fileSystemFetcher); + } + try { + updateTikaConfig(); + } catch (Exception e) { + throw new RuntimeException(e); + } + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void fetch(FetchRequest request, StreamObserver<FetchReply> responseObserver) { + AbstractFetcher fetcher = fetchers.get(request.getFetcherName()); + if (fetcher == null) { + throw new RuntimeException("Could not find fetcher with name " + request.getFetcherName()); + } + Metadata tikaMetadata = new Metadata(); + for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) { + tikaMetadata.add(entry.getKey(), entry.getValue()); + } + try { + PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(), + new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); + for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) { + FetchReply.Builder fetchReplyBuilder = FetchReply.newBuilder() + .setFetchKey(request.getFetchKey()); + for (String name : metadata.names()) { + String value = metadata.get(name); + if (value != null) { + fetchReplyBuilder.putFields(name, value); + } + } + responseObserver.onNext(fetchReplyBuilder.build()); + } + responseObserver.onCompleted(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto new file mode 100644 index 000000000..f2b350f5e --- /dev/null +++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto @@ -0,0 +1,47 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.tika"; +option java_outer_classname = "TikaProto"; +option objc_class_prefix = "HLW"; + +package tika; + +service Tika { + rpc CreateFetcher (CreateFetcherRequest) returns (CreateFetcherReply) {} + rpc Fetch (FetchRequest) returns (FetchReply) {} +} + +message CreateFetcherRequest { + string name = 1; + string fetcherClass = 2; + map<string,string> params = 3; +} + +message CreateFetcherReply { + string message = 1; +} + +message FetchRequest { + string fetcherName = 1; + string fetchKey = 2; + map<string,string> metadata = 3; +} + +message FetchReply { + string fetchKey = 1; + map<string,string> fields = 2; +} diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java new file mode 100644 index 000000000..f100f676f --- /dev/null +++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.pipes.grpc; + +import static org.junit.Assert.assertEquals; + +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import org.apache.tika.CreateFetcherReply; +import org.apache.tika.CreateFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.grpc.TikaServer; + +@RunWith(JUnit4.class) +public class TikaServerTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + @Test + public void greeterImpl_replyMessage() throws Exception { + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(new TikaServer.TikaServerImpl()).build().start()); + + TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub( + // Create a client channel and register for automatic graceful shutdown. + grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())); + + + String testName = "test name"; + CreateFetcherReply reply = + blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(testName).build()); + + assertEquals(testName, reply.getMessage()); + } +} diff --git a/tika-pipes/tika-grpc/tika-config.xml b/tika-pipes/tika-grpc/tika-config.xml new file mode 100644 index 000000000..b7f8c535c --- /dev/null +++ b/tika-pipes/tika-grpc/tika-config.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?><!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<properties> + <pipes> + <params> + <numClients>2</numClients> + <forkedJvmArgs> + <arg>-Xmx1g</arg> + <arg>-XX:ParallelGCThreads=2</arg> + </forkedJvmArgs> + <timeoutMillis>60000</timeoutMillis> + <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit --> + </params> + </pipes> + <fetchers> + <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher"> + <name>file-system-fetcher-fabd51ef-51c1-447c-818c-96af18b2a893</name> + <basePath>C:\Users\nicho\Downloads\000</basePath> + </fetcher> + </fetchers> +</properties> \ No newline at end of file
