This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4519 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 18f0d77a33349bc1a5437503ff9af8f0e34f881e Author: tallison <[email protected]> AuthorDate: Wed Oct 29 08:29:02 2025 -0400 TIKA-4519 - checkpoint --- .../pipes/opensearch/tests/OpenSearchTest.java | 1 - .../tika/pipes/s3/tests/PipeIntegrationTests.java | 1 - tika-pipes/tika-emitters/pom.xml | 18 ++- .../tika/pipes/emitter/azblob/AZBlobEmitter.java | 1 - .../pipes/emitter/azblob/TestAZBlobEmitter.java | 1 - .../tika-emitters/tika-emitter-file-system/pom.xml | 141 +++++++++++++++++++++ .../src/main/assembly/assembly.xml | 50 ++++++++ .../tika/pipes/emitter/fs/FileSystemEmitter.java | 14 +- .../apache/tika/pipes/emitter/gcs/GCSEmitter.java | 1 - .../tika/pipes/emitter/gcs/TestGCSEmitter.java | 1 - .../tika/pipes/emitter/jdbc/JDBCEmitter.java | 11 +- .../tika/pipes/emitter/jdbc/JDBCEmitterTest.java | 1 - .../apache/tika/pipes/emitter/s3/S3Emitter.java | 1 - .../apache/tika/pipes/api}/emitter/Emitter.java | 10 +- .../tika/pipes/api/emitter/EmitterConfig.java} | 16 +-- .../tika/pipes/api}/emitter/StreamEmitter.java | 4 +- .../tika/pipes/core/TikaPipesConfigTest.java | 1 - .../apache/tika/pipes/core/async/MockEmitter.java | 1 - .../org/apache/tika/pipes/core/PipesClient.java | 4 +- .../apache/tika/pipes/core/PipesPluginsConfig.java | 28 +++- .../org/apache/tika/pipes/core/PipesResult.java | 26 ++-- .../org/apache/tika/pipes/core/PipesServer.java | 24 ++-- .../apache/tika/pipes/core/async/AsyncEmitter.java | 10 +- .../tika/pipes/core/async/AsyncProcessor.java | 18 +-- .../tika/pipes/core/emitter/AbstractEmitter.java | 52 -------- .../apache/tika/pipes/core/emitter/EmitKey.java | 16 +-- .../tika/pipes/core/emitter/EmitterManager.java | 62 ++++++--- .../pipes/core/emitter/TikaEmitterException.java | 4 +- .../config/DefaultEmitterConfig.java} | 12 +- .../EmittingEmbeddedDocumentBytesHandler.java | 2 - ...erConfigImpl.java => DefaultFetcherConfig.java} | 4 +- .../serialization/FetchEmitTupleSerializer.java | 2 +- .../pipes/core/serialization/JsonEmitData.java | 4 +- .../tika/pipes/emitter/fs/FileSystemEmitter.java | 2 - .../tika/pipes/core/TikaPipesConfigTest.java | 1 - .../filelist/FileListPipesIteratorTest.java | 4 +- .../tika/server/core/resource/AsyncResource.java | 4 +- .../tika/server/core/resource/PipesResource.java | 2 +- 38 files changed, 369 insertions(+), 186 deletions(-) diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java index 9923a320a..84b1120e3 100644 --- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java +++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/OpenSearchTest.java @@ -51,7 +51,6 @@ import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.HandlerConfig; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.emitter.opensearch.JsonResponse; import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter; diff --git a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java index 3aafbcfb2..ab7f18228 100644 --- a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java +++ b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java @@ -46,7 +46,6 @@ import software.amazon.awssdk.services.s3.model.S3Object; import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.core.FetchEmitTuple; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; import org.apache.tika.pipes.core.pipesiterator.CallablePipesIterator; diff --git a/tika-pipes/tika-emitters/pom.xml b/tika-pipes/tika-emitters/pom.xml index 9dd094708..3f870b8cb 100644 --- a/tika-pipes/tika-emitters/pom.xml +++ b/tika-pipes/tika-emitters/pom.xml @@ -32,15 +32,31 @@ <packaging>pom</packaging> <modules> + <module>tika-emitter-file-system</module> + <!-- <module>tika-emitter-s3</module> <module>tika-emitter-kafka</module> <module>tika-emitter-solr</module> <module>tika-emitter-opensearch</module> <module>tika-emitter-gcs</module> <module>tika-emitter-az-blob</module> - <module>tika-emitter-jdbc</module> + <module>tika-emitter-jdbc</module>--> </modules> + <dependencies> + <dependency> + <groupId>org.pf4j</groupId> + <artifactId>pf4j</artifactId> + <!-- !!! VERY IMPORTANT --> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-api</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> <plugins> <plugin> diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java index a4e81f2cc..eaad93cf0 100644 --- a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java @@ -48,7 +48,6 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.emitter.AbstractEmitter; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.serialization.JsonMetadataList; import org.apache.tika.utils.StringUtils; diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java index bed2367a0..673af3f32 100644 --- a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/test/java/org/apache/tika/pipes/emitter/azblob/TestAZBlobEmitter.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; @Disabled("turn into an actual test") diff --git a/tika-pipes/tika-emitters/tika-emitter-file-system/pom.xml b/tika-pipes/tika-emitters/tika-emitter-file-system/pom.xml new file mode 100644 index 000000000..a6bcab706 --- /dev/null +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/pom.xml @@ -0,0 +1,141 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tika-emitters</artifactId> + <groupId>org.apache.tika</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tika-emitter-file-system</artifactId> + <name>Apache Tika file system emitter</name> + <properties> + <plugin.id>file-system-emitter</plugin.id> + <plugin.class>org.apache.tika.pipes.emitter.fs.FileSystemEmitterPlugin</plugin.class> + <plugin.version>4.0.0-SNAPSHOT</plugin.version> + <plugin.provider>Local File System Emitter</plugin.provider> + <plugin.dependencies /> + <!-- Never include the core artifacts in your plugin lib directory. If you do, it will cause the classloading + to get messed up when finding your plugins. --> + <plugin.excluded.artifactIds>tika-core,tika-pipes-api,tika-pipes-core</plugin.excluded.artifactIds> + + </properties> + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-serialization</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>3.6.1</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <includeScope>runtime</includeScope> + <excludeArtifactIds>${plugin.excluded.artifactIds}</excludeArtifactIds> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.4.2</version> + <configuration> + <archive> + <manifest> + <addDefaultImplementationEntries>true</addDefaultImplementationEntries> + <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> + </manifest> + <manifestEntries> + <Plugin-Id>file-system-emitter</Plugin-Id> + <Plugin-Version>${project.version}</Plugin-Version> + <Plugin-Class>org.apache.tika.pipes.emitter.fs.FileSystemEmitterPlugin</Plugin-Class> + <Plugin-Provider>Local File System Emitter</Plugin-Provider> + <Plugin-Description>Capable of fetching the local file system</Plugin-Description> +<!-- <Plugin-Requires>1.0.0</Plugin-Requires> --> + <Plugin-Dependencies></Plugin-Dependencies> + </manifestEntries> + </archive> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <annotationProcessors> + <annotationProcessor>org.pf4j.processor.ExtensionAnnotationProcessor</annotationProcessor> + </annotationProcessors> + </configuration> + </plugin> + </plugins> + </build> + <scm> + <tag>3.0.0-rc1</tag> + </scm> +</project> diff --git a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/assembly/assembly.xml b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/assembly/assembly.xml new file mode 100644 index 000000000..bac95e714 --- /dev/null +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/assembly/assembly.xml @@ -0,0 +1,50 @@ +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 + http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <id>dependencies-zip</id> + <formats> + <format>zip</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.directory}/lib</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>/lib</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>classes/META-INF/extensions.idx</include> + <include>classes/META-INF/MANIFEST.MF</include> + </includes> + </fileSet> + <!-- this is a bit of a hack. There has to be a better solution. --> + <!-- this one is for pf4j --> + <fileSet> + <directory>${project.basedir}/src/main/resources</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>plugin.properties</include> + </includes> + </fileSet> + <!-- this one is for the application so that the extension can read its pluginId, when in zip mode + we don't need this for jar mode + --> + <fileSet> + <directory>${project.basedir}/src/main/resources</directory> + <outputDirectory>/classes</outputDirectory> + <includes> + <include>plugin.properties</include> + </includes> + </fileSet> + </fileSets> +</assembly> diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java similarity index 90% copy from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java copy to tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java index 2643e34a1..efc4a4fdc 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java @@ -31,9 +31,7 @@ import org.apache.tika.config.Field; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.AbstractEmitter; -import org.apache.tika.pipes.core.emitter.StreamEmitter; -import org.apache.tika.pipes.core.emitter.TikaEmitterException; +import org.apache.tika.pipes.api.emitter.StreamEmitter; import org.apache.tika.serialization.JsonMetadataList; /** @@ -65,7 +63,7 @@ import org.apache.tika.serialization.JsonMetadataList; * </emitters> * </properties></pre> */ -public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter { +public class FileSystemEmitter implements StreamEmitter { private Path basePath = null; private String fileExtension = "json"; @@ -74,10 +72,10 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter private boolean prettyPrint = false; @Override - public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException { + public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException { Path output; if (metadataList == null || metadataList.isEmpty()) { - throw new TikaEmitterException("metadata list must not be null or of size 0"); + throw new IOException("metadata list must not be null or of size 0"); } if (fileExtension != null && ! fileExtension.isEmpty()) { @@ -86,7 +84,7 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter if (basePath != null) { output = basePath.resolve(emitKey); if (!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize())) { - throw new TikaEmitterException("path traversal?! " + output.toAbsolutePath()); + throw new IOException("path traversal?! " + output.toAbsolutePath()); } } else { output = Paths.get(emitKey); @@ -146,7 +144,7 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter } @Override - public void emit(String path, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) throws IOException, TikaEmitterException { + public void emit(String path, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) throws IOException { Path target = basePath.resolve(path); if (!Files.isDirectory(target.getParent())) { diff --git a/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java b/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java index fd0f0ce7c..1474e9752 100644 --- a/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java @@ -47,7 +47,6 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.emitter.AbstractEmitter; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.serialization.JsonMetadataList; import org.apache.tika.utils.StringUtils; diff --git a/tika-pipes/tika-emitters/tika-emitter-gcs/src/test/java/org/apache/tika/pipes/emitter/gcs/TestGCSEmitter.java b/tika-pipes/tika-emitters/tika-emitter-gcs/src/test/java/org/apache/tika/pipes/emitter/gcs/TestGCSEmitter.java index 23c42aa0f..fe81cc142 100644 --- a/tika-pipes/tika-emitters/tika-emitter-gcs/src/test/java/org/apache/tika/pipes/emitter/gcs/TestGCSEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-gcs/src/test/java/org/apache/tika/pipes/emitter/gcs/TestGCSEmitter.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; @Disabled("turn into an actual test") diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java index ced0a92ff..d230754ad 100644 --- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java @@ -261,21 +261,20 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close * @param emitKey emit key * @param metadataList list of metadata per file * @throws IOException - * @throws TikaEmitterException */ @Override public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) - throws IOException, TikaEmitterException { + throws IOException { if (metadataList == null || metadataList.size() < 1) { return; } - List<EmitData> emitDataList = new ArrayList<>(); - emitDataList.add(new EmitData(new EmitKey("", emitKey), metadataList)); - emit(emitDataList); + List<EmitData> emitDataTupleList = new ArrayList<>(); + emitDataTupleList.add(new EmitData(new EmitKey("", emitKey), metadataList)); + emit(emitDataTupleList); } @Override - public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { + public void emit(List<? extends EmitData> emitData) throws IOException { int tries = 0; Exception ex = null; while (tries++ <= maxRetries) { diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java index 448484197..8ecbaf5d1 100644 --- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java @@ -41,7 +41,6 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; public class JDBCEmitterTest { diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java index edb7fff1f..df05dcf7a 100644 --- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java @@ -65,7 +65,6 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.emitter.AbstractEmitter; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.serialization.JsonMetadataList; import org.apache.tika.utils.StringUtils; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/Emitter.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java similarity index 81% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/Emitter.java rename to tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java index f3450330e..aa7122a50 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/Emitter.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java @@ -14,21 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.emitter; +package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.util.List; +import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; public interface Emitter { - String getName(); + void configure(EmitterConfig emitterConfig) throws TikaConfigException, IOException; - void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException; + String getPluginId(); + + void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException; - void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException; //TODO -- add this later for xhtml? //void emit(String txt, Metadata metadata) throws IOException, TikaException; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java similarity index 72% copy from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java copy to tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java index 8b07e7698..8c944ee58 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java @@ -14,16 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.emitter; +package org.apache.tika.pipes.api.emitter; -import org.apache.tika.exception.TikaException; +import java.io.Serializable; -public class TikaEmitterException extends TikaException { - public TikaEmitterException(String msg) { - super(msg); - } +public interface EmitterConfig extends Serializable { - public TikaEmitterException(String msg, Throwable t) { - super(msg, t); - } + String getPluginId(); + EmitterConfig setPluginId(String pluginId); + String getConfigJson(); + EmitterConfig setConfigJson(String config); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/StreamEmitter.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/StreamEmitter.java similarity index 91% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/StreamEmitter.java rename to tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/StreamEmitter.java index 93a0505c2..d713ba623 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/StreamEmitter.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/StreamEmitter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.emitter; +package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.io.InputStream; @@ -24,5 +24,5 @@ import org.apache.tika.parser.ParseContext; public interface StreamEmitter extends Emitter { void emit(String emitKey, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) - throws IOException, TikaEmitterException; + throws IOException; } diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java index 65a744a55..02bb42c06 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import org.apache.tika.config.AbstractTikaConfigTest; import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java index a4464fa75..fab797ed1 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java @@ -24,7 +24,6 @@ import java.util.concurrent.ArrayBlockingQueue; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.AbstractEmitter; import org.apache.tika.pipes.core.emitter.EmitData; import org.apache.tika.pipes.core.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.TikaEmitterException; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 10d837f00..11fffa7fb 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -405,8 +405,8 @@ public class PipesClient implements Closeable { try (ObjectInputStream objectInputStream = new ObjectInputStream( UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) { Metadata metadata = (Metadata) objectInputStream.readObject(); - EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata)); - return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitData, true); + EmitData emitDataTuple = new EmitData(emitKey, Collections.singletonList(metadata)); + return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitDataTuple, true); } catch (ClassNotFoundException e) { LOG.error("class not found exception deserializing data", e); //this should be catastrophic diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java index 2c26be7fd..3ef3a9ec0 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java @@ -31,8 +31,9 @@ import java.util.Optional; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.tika.pipes.api.emitter.EmitterConfig; import org.apache.tika.pipes.api.fetcher.FetcherConfig; -import org.apache.tika.pipes.core.fetcher.config.FetcherConfigImpl; +import org.apache.tika.pipes.core.fetcher.config.DefaultFetcherConfig; public class PipesPluginsConfig { @@ -46,20 +47,34 @@ public class PipesPluginsConfig { while (it.hasNext()) { String pluginId = it.next(); JsonNode fetcherConfig = fetchers.get(pluginId); - fetcherMap.put(pluginId, new FetcherConfigImpl(pluginId, fetcherConfig.toString())); + fetcherMap.put(pluginId, new DefaultFetcherConfig(pluginId, fetcherConfig.toString())); + } + } + Map<String, FetcherConfig> emitterMap = new HashMap<>(); + if (plugins.has("emitters")) { + JsonNode emitters = plugins.get("emitters"); + Iterator<String> it = emitters.fieldNames(); + while (it.hasNext()) { + String pluginId = it.next(); + JsonNode emitterConfig = emitters.get(pluginId); + emitterMap.put(pluginId, new EmitterConfigImpl(pluginId, emitterConfig.toString())); } } Path pluginsDir = null; if (plugins.has("pf4j.pluginsDir")) { pluginsDir = Paths.get(plugins.get("pf4j.pluginsDir").asText()); } - return new PipesPluginsConfig(fetcherMap, pluginsDir); + return new PipesPluginsConfig(fetcherMap, emitterMap, pluginsDir); } private final Map<String, FetcherConfig> fetcherMap; + private final Map<String, EmitterConfig> emitterMap; + + private final Path pluginsDir; - private PipesPluginsConfig(Map<String, FetcherConfig> fetcherMap, Path pluginsDir) { + private PipesPluginsConfig(Map<String, FetcherConfig> fetcherMap, Map<String, EmitterConfig> emitterMap, Path pluginsDir) { this.fetcherMap = fetcherMap; + this.emitterMap = emitterMap; this.pluginsDir = pluginsDir; } @@ -67,6 +82,11 @@ public class PipesPluginsConfig { return Optional.ofNullable(fetcherMap.get(pluginId)); } + public Optional<EmitterConfig> getEmitterConfig(String pluginId) { + return Optional.ofNullable(emitterMap.get(pluginId)); + } + + public Optional<Path> getPluginsDir() { return Optional.ofNullable(pluginsDir); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java index 961c00a75..3f391ae28 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java @@ -49,12 +49,12 @@ public class PipesResult { public static final PipesResult EMPTY_OUTPUT = new PipesResult(STATUS.EMPTY_OUTPUT); private final STATUS status; - private final EmitData emitData; + private final EmitData emitDataTuple; private final String message; - public PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) { + public PipesResult(STATUS status, EmitData emitDataTuple, String message, boolean intermediate) { this.status = status; - this.emitData = emitData; + this.emitDataTuple = emitDataTuple; this.message = message; this.intermediate = intermediate; } @@ -70,25 +70,25 @@ public class PipesResult { /** * This assumes parse success with no parse exception * - * @param emitData + * @param emitDataTuple */ - public PipesResult(EmitData emitData) { - this(STATUS.PARSE_SUCCESS, emitData, null, false); + public PipesResult(EmitData emitDataTuple) { + this(STATUS.PARSE_SUCCESS, emitDataTuple, null, false); } - public PipesResult(STATUS status, EmitData emitData, boolean intermediate) { - this(status, emitData, null, intermediate); + public PipesResult(STATUS status, EmitData emitDataTuple, boolean intermediate) { + this(status, emitDataTuple, null, intermediate); } /** * This assumes that the message is a stack trace (container * parse exception). * - * @param emitData + * @param emitDataTuple * @param message */ - public PipesResult(EmitData emitData, String message) { - this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message, false); + public PipesResult(EmitData emitDataTuple, String message) { + this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitDataTuple, message, false); } public STATUS getStatus() { @@ -96,7 +96,7 @@ public class PipesResult { } public EmitData getEmitData() { - return emitData; + return emitDataTuple; } public String getMessage() { @@ -110,6 +110,6 @@ public class PipesResult { @Override public String toString() { return "PipesResult{" + "intermediate=" + intermediate + ", status=" + status + - ", emitData=" + emitData + ", message='" + message + '\'' + '}'; + ", emitData=" + emitDataTuple + ", message='" + message + '\'' + '}'; } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java index d4c040727..b7429b9a5 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java @@ -66,12 +66,12 @@ import org.apache.tika.parser.DigestingParser; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; import org.apache.tika.parser.RecursiveParserWrapper; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.pipes.api.emitter.StreamEmitter; import org.apache.tika.pipes.api.fetcher.Fetcher; import org.apache.tika.pipes.core.emitter.EmitData; import org.apache.tika.pipes.core.emitter.EmitKey; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.pipes.core.extractor.BasicEmbeddedDocumentBytesHandler; import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig; @@ -307,7 +307,7 @@ public class PipesServer implements Runnable { Emitter emitter = null; try { - emitter = emitterManager.getEmitter(emitKey.getEmitterName()); + emitter = emitterManager.getEmitter(emitKey.getEmitterPluginId()); } catch (IllegalArgumentException e) { String noEmitterMsg = getNoEmitterMsg(taskId); LOG.warn(noEmitterMsg); @@ -321,7 +321,7 @@ public class PipesServer implements Runnable { } else { emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList(), parseContext); } - } catch (IOException | TikaEmitterException e) { + } catch (IOException e) { LOG.warn("emit exception", e); String msg = ExceptionUtils.getStackTrace(e); byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); @@ -354,12 +354,12 @@ public class PipesServer implements Runnable { exit(1); } - EmitData filteredEmitData = new EmitData(emitKey, filtered, parseExceptionStack); + EmitData filteredEmitDataTuple = new EmitData(emitKey, filtered, parseExceptionStack); try { UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { - objectOutputStream.writeObject(filteredEmitData); + objectOutputStream.writeObject(filteredEmitDataTuple); } write(STATUS.EMIT_SUCCESS_PASS_BACK, bos.toByteArray()); } catch (IOException e) { @@ -487,16 +487,16 @@ public class PipesServer implements Runnable { injectUserMetadata(t.getMetadata(), parseData.getMetadataList()); EmitKey emitKey = t.getEmitKey(); if (StringUtils.isBlank(emitKey.getEmitKey())) { - emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey()); + emitKey = new EmitKey(emitKey.getEmitterPluginId(), t.getFetchKey().getFetchKey()); t.setEmitKey(emitKey); } - EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack); - if (shouldEmit(embeddedDocumentBytesConfig, parseData, emitData)) { + EmitData emitDataTuple = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack); + if (shouldEmit(embeddedDocumentBytesConfig, parseData, emitDataTuple)) { emit(t.getId(), emitKey, embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(), parseData, stack, parseContext); } else { //send back to the client - write(emitData); + write(emitDataTuple); } if (LOG.isTraceEnabled()) { LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start); @@ -506,7 +506,7 @@ public class PipesServer implements Runnable { } } - private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitData emitData) { + private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitData emitDataTuple) { if (emitStrategy == EMIT_STRATEGY.EMIT_ALL) { return true; } else if (embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() && @@ -515,7 +515,7 @@ public class PipesServer implements Runnable { } else if (emitStrategy == EMIT_STRATEGY.PASSBACK_ALL) { return false; } else if (emitStrategy == EMIT_STRATEGY.DYNAMIC) { - if (emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) { + if (emitDataTuple.getEstimatedSizeBytes() >= maxForEmitBatchBytes) { return true; } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java index 7f2857740..63e1f8fc3 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java @@ -30,8 +30,8 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitData; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.utils.ExceptionUtils; @@ -110,7 +110,7 @@ public class AsyncEmitter implements Callable<Integer> { (estimatedSize + sz), maxBytes); emitAll(); } - List<EmitData> cached = map.computeIfAbsent(data.getEmitKey().getEmitterName(), k -> new ArrayList<>()); + List<EmitData> cached = map.computeIfAbsent(data.getEmitKey().getEmitterPluginId(), k -> new ArrayList<>()); updateEstimatedSize(sz); cached.add(data); } @@ -131,11 +131,11 @@ public class AsyncEmitter implements Callable<Integer> { lastEmitted = Instant.now(); } - private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData) { + private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitDatumTuples) { try { - emitter.emit(cachedEmitData); - } catch (IOException | TikaEmitterException e) { + emitter.emit(cachedEmitDatumTuples); + } catch (IOException e) { LOG.warn("emitter class ({}): {}", emitter.getClass(), ExceptionUtils.getStackTrace(e)); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java index 31e57aac9..e57be5621 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java @@ -58,7 +58,7 @@ public class AsyncProcessor implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class); private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples; - private final ArrayBlockingQueue<EmitData> emitData; + private final ArrayBlockingQueue<EmitData> emitDatumTuples; private final ExecutorCompletionService<Integer> executorCompletionService; private final ExecutorService executorService; private final AsyncConfig asyncConfig; @@ -76,7 +76,7 @@ public class AsyncProcessor implements Closeable { public AsyncProcessor(Path tikaConfigPath, Path pipesPluginsConfigPath, PipesIterator pipesIterator) throws TikaException, IOException { this.asyncConfig = AsyncConfig.load(tikaConfigPath, pipesPluginsConfigPath); this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize()); - this.emitData = new ArrayBlockingQueue<>(100); + this.emitDatumTuples = new ArrayBlockingQueue<>(100); //+1 is the watcher thread this.executorService = Executors.newFixedThreadPool( asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1); @@ -107,13 +107,13 @@ public class AsyncProcessor implements Closeable { for (int i = 0; i < asyncConfig.getNumClients(); i++) { executorCompletionService.submit( - new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitDatumTuples)); } EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); for (int i = 0; i < asyncConfig.getNumEmitters(); i++) { executorCompletionService.submit( - new AsyncEmitter(asyncConfig, emitData, emitterManager)); + new AsyncEmitter(asyncConfig, emitDatumTuples, emitterManager)); } } catch (Exception e) { LOG.error("problem initializing AsyncProcessor", e); @@ -231,7 +231,7 @@ public class AsyncProcessor implements Closeable { if (numParserThreadsFinished == asyncConfig.getNumClients() && ! addedEmitterSemaphores) { for (int i = 0; i < asyncConfig.getNumEmitters(); i++) { try { - boolean offered = emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE, + boolean offered = emitDatumTuples.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE, MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS); if (! offered) { @@ -262,14 +262,14 @@ public class AsyncProcessor implements Closeable { private final AsyncConfig asyncConfig; private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples; - private final ArrayBlockingQueue<EmitData> emitDataQueue; + private final ArrayBlockingQueue<EmitData> emitDataTupleQueue; private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples, - ArrayBlockingQueue<EmitData> emitDataQueue) { + ArrayBlockingQueue<EmitData> emitDataTupleQueue) { this.asyncConfig = asyncConfig; this.fetchEmitTuples = fetchEmitTuples; - this.emitDataQueue = emitDataQueue; + this.emitDataTupleQueue = emitDataTupleQueue; } @Override @@ -305,7 +305,7 @@ public class AsyncProcessor implements Closeable { if (shouldEmit(result)) { LOG.trace("adding result to emitter queue: " + result.getEmitData()); - boolean offered = emitDataQueue.offer(result.getEmitData(), + boolean offered = emitDataTupleQueue.offer(result.getEmitData(), MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS); if (! offered) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/AbstractEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/AbstractEmitter.java deleted file mode 100644 index 56e8e8129..000000000 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/AbstractEmitter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.pipes.core.emitter; - -import java.io.IOException; -import java.util.List; - -import org.apache.tika.parser.ParseContext; - -public abstract class AbstractEmitter implements Emitter { - - private String name; - - @Override - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - /** - * The default behavior is to call {@link #emit(String, List, ParseContext)} on each item. - * Some implementations, e.g. Solr/ES/vespa, can benefit from subclassing this and - * emitting a bunch of docs at once. - * - * @param emitData - * @throws IOException - * @throws TikaEmitterException - */ - @Override - public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { - for (EmitData d : emitData) { - emit(d.getEmitKey().getEmitKey(), d.getMetadataList(), d.getParseContext()); - } - } -} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitKey.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitKey.java index 9274d8f74..64de8ff33 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitKey.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitKey.java @@ -28,20 +28,20 @@ public class EmitKey implements Serializable { */ private static final long serialVersionUID = -3861669115439125268L; - private String emitterName; + private String emitterPluginId; private String emitKey; //for serialization only...yuck. public EmitKey() { } - public EmitKey(String emitterName, String emitKey) { - this.emitterName = emitterName; + public EmitKey(String emitterPluginId, String emitKey) { + this.emitterPluginId = emitterPluginId; this.emitKey = emitKey; } - public String getEmitterName() { - return emitterName; + public String getEmitterPluginId() { + return emitterPluginId; } public String getEmitKey() { @@ -50,7 +50,7 @@ public class EmitKey implements Serializable { @Override public String toString() { - return "EmitterKey{" + "emitterName='" + emitterName + '\'' + ", emitterKey='" + emitKey + + return "EmitterKey{" + "emitterPluginId='" + emitterPluginId + '\'' + ", emitterKey='" + emitKey + '\'' + '}'; } @@ -65,7 +65,7 @@ public class EmitKey implements Serializable { EmitKey emitKey1 = (EmitKey) o; - if (!Objects.equals(emitterName, emitKey1.emitterName)) { + if (!Objects.equals(emitterPluginId, emitKey1.emitterPluginId)) { return false; } return Objects.equals(emitKey, emitKey1.emitKey); @@ -73,7 +73,7 @@ public class EmitKey implements Serializable { @Override public int hashCode() { - int result = emitterName != null ? emitterName.hashCode() : 0; + int result = emitterPluginId != null ? emitterPluginId.hashCode() : 0; result = 31 * result + (emitKey != null ? emitKey.hashCode() : 0); return result; } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java index 236fc6e65..4dcfcc8fa 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java @@ -20,46 +20,72 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.pf4j.DefaultPluginManager; +import org.pf4j.PluginManager; + import org.apache.tika.config.ConfigBase; +import org.apache.tika.config.Initializable; +import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.pipes.api.emitter.EmitterConfig; +import org.apache.tika.pipes.core.PipesPluginsConfig; /** - * Utility class that will apply the appropriate fetcher - * to the fetcherString based on the prefix. + * Utility class that will apply the appropriate emitter + * to the emitterString based on the prefix. * <p> - * This does not allow multiple fetchers supporting the same prefix. + * This does not allow multiple emitters supporting the same prefix. */ public class EmitterManager extends ConfigBase { private final Map<String, Emitter> emitterMap = new ConcurrentHashMap<>(); - public static EmitterManager load(Path tikaConfigPath) throws IOException, TikaConfigException { - try (InputStream is = Files.newInputStream(tikaConfigPath) ) { - return EmitterManager.buildComposite( - "emitters", EmitterManager.class, - "emitter", - Emitter.class, is); + public static EmitterManager load(Path path) throws IOException, TikaConfigException { + try (InputStream is = Files.newInputStream(path)) { + return load(is); } } + public static EmitterManager load(InputStream pipesPluginsConfigIs) throws IOException, TikaConfigException { + PipesPluginsConfig pluginsConfig = PipesPluginsConfig.load(pipesPluginsConfigIs); + PluginManager pluginManager = null; + if (pluginsConfig.getPluginsDir().isPresent()) { + pluginManager = new DefaultPluginManager(pluginsConfig.getPluginsDir().get()); + } else { + pluginManager = new DefaultPluginManager(); + } + pluginManager.loadPlugins(); + pluginManager.startPlugins(); + Map<String, Emitter> emitterMap = new HashMap<>(); + for (Emitter emitter : pluginManager.getExtensions(Emitter.class)) { + Optional<EmitterConfig> emitterConfig = pluginsConfig.getEmitterConfig(emitter.getPluginId()); + if (emitterConfig.isPresent()) { + emitter.configure(emitterConfig.get()); + if (emitter instanceof Initializable) { + ((Initializable) emitter).checkInitialization(InitializableProblemHandler.THROW); + } + } else { + LOG.warn("no configuration found for emitter pluginId={}", emitter.getPluginId()); + } + emitterMap.put(emitter.getPluginId(), emitter); + } + return new EmitterManager(emitterMap); + } + private EmitterManager() { } - public EmitterManager(List<Emitter> emitters) { - for (Emitter emitter : emitters) { - if (emitterMap.containsKey(emitter.getName())) { - throw new IllegalArgumentException( - "Multiple emitters cannot support the same name: " + emitter.getName()); - } - emitterMap.put(emitter.getName(), emitter); - - } + private EmitterManager(Map<String, Emitter> emitters) { + emitterMap.putAll(emitters); } public Set<String> getSupported() { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java index 8b07e7698..55f001b8e 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/TikaEmitterException.java @@ -16,9 +16,9 @@ */ package org.apache.tika.pipes.core.emitter; -import org.apache.tika.exception.TikaException; +import java.io.IOException; -public class TikaEmitterException extends TikaException { +public class TikaEmitterException extends IOException { public TikaEmitterException(String msg) { super(msg); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java similarity index 78% copy from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java copy to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java index dfc5528da..4d41cd7d6 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.fetcher.config; +package org.apache.tika.pipes.core.emitter.config; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import org.apache.tika.pipes.api.emitter.EmitterConfig; -public class FetcherConfigImpl implements FetcherConfig { +public class DefaultEmitterConfig implements EmitterConfig { private String plugId; private String configJson; - public FetcherConfigImpl(String plugId, String configJson) { + public DefaultEmitterConfig(String plugId, String configJson) { this.plugId = plugId; this.configJson = configJson; } @@ -33,7 +33,7 @@ public class FetcherConfigImpl implements FetcherConfig { } @Override - public FetcherConfig setPluginId(String pluginId) { + public EmitterConfig setPluginId(String pluginId) { this.plugId = pluginId; return this; } @@ -44,7 +44,7 @@ public class FetcherConfigImpl implements FetcherConfig { } @Override - public FetcherConfig setConfigJson(String configJson) { + public EmitterConfig setConfigJson(String configJson) { this.configJson = configJson; return this; } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java index 22854c4ce..ac9413583 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java @@ -25,9 +25,7 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.FetchEmitTuple; import org.apache.tika.pipes.core.emitter.EmitKey; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; public class EmittingEmbeddedDocumentBytesHandler extends AbstractEmbeddedDocumentBytesHandler { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java similarity index 91% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java index dfc5528da..986d015ee 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/FetcherConfigImpl.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java @@ -18,12 +18,12 @@ package org.apache.tika.pipes.core.fetcher.config; import org.apache.tika.pipes.api.fetcher.FetcherConfig; -public class FetcherConfigImpl implements FetcherConfig { +public class DefaultFetcherConfig implements FetcherConfig { private String plugId; private String configJson; - public FetcherConfigImpl(String plugId, String configJson) { + public DefaultFetcherConfig(String plugId, String configJson) { this.plugId = plugId; this.configJson = configJson; } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/FetchEmitTupleSerializer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/FetchEmitTupleSerializer.java index 368b6199b..45eb74257 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/FetchEmitTupleSerializer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/FetchEmitTupleSerializer.java @@ -49,7 +49,7 @@ public class FetchEmitTupleSerializer extends JsonSerializer<FetchEmitTuple> { jsonGenerator.writeNumberField(FETCH_RANGE_START, t.getFetchKey().getRangeStart()); jsonGenerator.writeNumberField(FETCH_RANGE_END, t.getFetchKey().getRangeEnd()); } - jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName()); + jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterPluginId()); if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) { jsonGenerator.writeStringField(EMIT_KEY, t.getEmitKey().getEmitKey()); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java index 2ec5f9343..cf86bf80c 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java @@ -40,7 +40,7 @@ public class JsonEmitData { OBJECT_MAPPER.registerModule(module); } - public static void toJson(EmitData emitData, Writer writer) throws IOException { - OBJECT_MAPPER.writeValue(writer, emitData); + public static void toJson(EmitData emitDataTuple, Writer writer) throws IOException { + OBJECT_MAPPER.writeValue(writer, emitDataTuple); } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java index 2643e34a1..e126ef66e 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java @@ -31,8 +31,6 @@ import org.apache.tika.config.Field; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.AbstractEmitter; -import org.apache.tika.pipes.core.emitter.StreamEmitter; import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.serialization.JsonMetadataList; diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java index aefadae04..f241b79d5 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import org.apache.tika.config.AbstractTikaConfigTest; import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.pipes.core.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java index 77c1332c8..42606b0d7 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java @@ -47,7 +47,7 @@ public class FileListPipesIteratorTest { assertEquals(t.getFetchKey().getFetchKey(), t.getEmitKey().getEmitKey()); assertEquals(t.getId(), t.getEmitKey().getEmitKey()); assertEquals("f", t.getFetchKey().getFetcherPluginId()); - assertEquals("e", t.getEmitKey().getEmitterName()); + assertEquals("e", t.getEmitKey().getEmitterPluginId()); lines.add(t.getId()); } assertEquals("the", lines.get(0)); @@ -70,7 +70,7 @@ public class FileListPipesIteratorTest { assertEquals(t.getFetchKey().getFetchKey(), t.getEmitKey().getEmitKey()); assertEquals(t.getId(), t.getEmitKey().getEmitKey()); assertEquals("f", t.getFetchKey().getFetcherPluginId()); - assertEquals("e", t.getEmitKey().getEmitterName()); + assertEquals("e", t.getEmitKey().getEmitterPluginId()); lines.add(t.getId()); } assertEquals("brown", lines.get(0)); diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java index 541458a65..efc5120fc 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java @@ -112,10 +112,10 @@ public class AsyncResource { .getSupported() .contains(t .getEmitKey() - .getEmitterName())) { + .getEmitterPluginId())) { return badEmitter(t .getEmitKey() - .getEmitterName()); + .getEmitterPluginId()); } ParseContext parseContext = t.getParseContext(); EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = parseContext.get(EmbeddedDocumentBytesConfig.class); diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java index 7673d6d8e..abbd4d8b4 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java @@ -120,7 +120,7 @@ public class PipesResource { case NO_EMITTER_FOUND: { throw new IllegalArgumentException("Couldn't find emitter that matched: " + fetchEmitTuple .getEmitKey() - .getEmitterName()); + .getEmitterPluginId()); } default: throw new IllegalArgumentException("I'm sorry, I don't yet handle a status of " + "this type: " + pipesResult.getStatus());
