This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4519 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 77682b95b8f6bad786db746a0f489275ddd80ed8 Author: tallison <[email protected]> AuthorDate: Mon Nov 3 13:24:01 2025 -0500 TIKA-4519 -- checkpoint commit --- .../java/org/apache/tika/plugins/PluginConfig.java | 10 +- .../org/apache/tika/plugins/PluginConfigs.java | 41 +++-- .../tika/eval/app/ExtractComparerRunner.java | 2 +- .../apache/tika/eval/app/ExtractProfileRunner.java | 2 +- .../tika/pipes/emitter/fs/FileSystemEmitter.java | 156 ++++++++----------- .../pipes/emitter/fs/FileSystemEmitterConfig.java | 20 +++ .../pipes/emitter/fs/FileSystemEmitterPlugin.java} | 40 +++-- .../src/main/resources/emitter-plugin.properties} | 8 +- .../tika/pipes/fetcher/fs/FileSystemFetcher.java | 24 +-- ...plugin.properties => fetcher-plugin.properties} | 0 .../pipes/fetcher/fs/FileSystemFetcherTest.java | 24 +-- .../AbstractEmitter.java} | 19 ++- .../AbstractStreamEmitter.java} | 17 +- .../apache/tika/pipes/api/emitter/EmitData.java | 19 +++ .../org/apache/tika/pipes/api/emitter/Emitter.java | 9 +- .../tika/pipes/api/fetcher/AbstractFetcher.java | 4 +- .../org/apache/tika/pipes/api/fetcher/Fetcher.java | 3 +- .../tika/pipes/api/fetcher/FetcherConfig.java | 27 ---- tika-pipes/tika-pipes-core-tests/pom.xml | 6 + .../apache/tika/pipes/core/PassbackFilterTest.java | 16 +- .../apache/tika/pipes/core/PluginsTestHelper.java | 14 +- .../tika/pipes/core/TikaPipesConfigTest.java | 9 +- .../pipes/core/async/AsyncChaosMonkeyTest.java | 18 +-- .../apache/tika/pipes/core/async/MockEmitter.java | 58 ------- .../apache/tika/pipes/core/async/MockFetcher.java | 4 +- .../tika/pipes/core/emitter/MockEmitter.java | 54 +++++-- .../tika/pipes/core/fetcher/MockFetcher.java | 4 +- .../test/resources/configs/fetchers-emitters.json | 16 ++ .../src/test/resources/configs/fetchers.json | 10 -- .../apache/tika/pipes/core/tika-emit-config.xml | 12 -- .../org/apache/tika/pipes/core/PipesClient.java | 6 +- .../apache/tika/pipes/core/PipesPluginsConfig.java | 97 ++++++++---- .../org/apache/tika/pipes/core/PipesResult.java | 14 +- .../org/apache/tika/pipes/core/PipesServer.java | 19 ++- .../apache/tika/pipes/core/async/AsyncEmitter.java | 30 ++-- .../tika/pipes/core/async/AsyncProcessor.java | 14 +- .../apache/tika/pipes/core/async/EmitDataPair.java | 6 + .../emitter/{EmitData.java => EmitDataImpl.java} | 15 +- .../tika/pipes/core/emitter/EmitterManager.java | 13 +- .../tika/pipes/core/emitter/EmptyEmitter.java | 23 ++- .../EmittingEmbeddedDocumentBytesHandler.java | 2 + .../tika/pipes/core/fetcher/EmptyFetcher.java | 4 +- .../tika/pipes/core/fetcher/FetcherManager.java | 4 +- .../pipes/core/pipesiterator/PipesIterator.java | 14 +- .../pipes/core/serialization/JsonEmitData.java | 4 +- .../tika/pipes/emitter/fs/FileSystemEmitter.java | 171 --------------------- .../filelist/FileListPipesIterator.java | 8 +- .../pipesiterator/fs/FileSystemPipesIterator.java | 6 +- .../tika/pipes/core/TikaPipesConfigTest.java | 8 +- .../tika/pipes/core/emitter/MockEmitter.java | 60 -------- .../filelist/FileListPipesIteratorTest.java | 8 +- .../fs/FileSystemPipesIteratorTest.java | 4 +- .../src/test/resources/configs/fetchers.json | 2 +- .../apache/tika/config/pipes-iterator-config.xml | 3 +- .../tika/config/pipes-iterator-multiple-config.xml | 6 +- .../pipesiterator/azblob/AZBlobPipesIterator.java | 2 +- .../pipes/pipesiterator/csv/CSVPipesIterator.java | 2 +- .../src/test/java/TestCSVPipesIterator.java | 2 +- .../pipes/pipesiterator/gcs/GCSPipesIterator.java | 2 +- .../pipesiterator/jdbc/JDBCPipesIterator.java | 6 +- .../pipesiterator/kafka/KafkaPipesIterator.java | 2 +- .../pipes/pipesiterator/s3/S3PipesIterator.java | 2 +- .../pipesiterator/solr/SolrPipesIterator.java | 2 +- .../tika/serialization/PluginConfigLoader.java | 34 ++++ .../serialization/PluginsConfigDeserializer.java | 27 ++++ .../serialization/PluginsConfigSerializer.java | 21 +++ .../tika/serialization/PluginsConfigTest.java | 69 +++++++++ .../tika/server/core/resource/AsyncResource.java | 4 +- 68 files changed, 652 insertions(+), 710 deletions(-) diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java similarity index 74% rename from tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java rename to tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java index 8c944ee58..532ee9d38 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java +++ b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java @@ -14,14 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.api.emitter; +package org.apache.tika.plugins; -import java.io.Serializable; +public record PluginConfig(String pluginId, String jsonConfig) { -public interface EmitterConfig extends Serializable { - - String getPluginId(); - EmitterConfig setPluginId(String pluginId); - String getConfigJson(); - EmitterConfig setConfigJson(String config); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java similarity index 51% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java rename to tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java index 986d015ee..fbed67343 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java +++ b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java @@ -14,38 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.fetcher.config; +package org.apache.tika.plugins; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; -public class DefaultFetcherConfig implements FetcherConfig { +public class PluginConfigs { - private String plugId; - private String configJson; + Map<String, PluginConfig> pluginConfigs = new HashMap<>(); + + public PluginConfigs() { - public DefaultFetcherConfig(String plugId, String configJson) { - this.plugId = plugId; - this.configJson = configJson; - } - @Override - public String getPluginId() { - return plugId; } - @Override - public FetcherConfig setPluginId(String pluginId) { - this.plugId = pluginId; - return this; + public PluginConfigs(Map<String, PluginConfig> map) { + pluginConfigs.putAll(map); } - @Override - public String getConfigJson() { - return configJson; + public void add(PluginConfig pluginConfig) { + if (pluginConfigs.containsKey(pluginConfig.pluginId())) { + throw new IllegalArgumentException("Can't overwrite existing plugin for id: " + pluginConfig.pluginId()); + } + pluginConfigs.put(pluginConfig.pluginId(), pluginConfig); } - @Override - public FetcherConfig setConfigJson(String configJson) { - this.configJson = configJson; - return this; + public Optional<PluginConfig> get(String pluginId) { + return Optional.ofNullable(pluginConfigs.get(pluginId)); } + } diff --git a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java index 0ab120c81..629fba23e 100644 --- a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java +++ b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java @@ -180,7 +180,7 @@ public class ExtractComparerRunner { private static PipesIterator createIterator(Path inputDir) { FileSystemPipesIterator fs = new FileSystemPipesIterator(inputDir); fs.setFetcherName(""); - fs.setEmitterName(""); + fs.setEmitterPluginId(""); return fs; } diff --git a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java index b618bf0af..2790bb9f0 100644 --- a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java +++ b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java @@ -174,7 +174,7 @@ public class ExtractProfileRunner { private static PipesIterator createIterator(Path inputDir) { FileSystemPipesIterator fs = new FileSystemPipesIterator(inputDir); fs.setFetcherName(""); - fs.setEmitterName(""); + fs.setEmitterPluginId(""); return fs; } diff --git a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java index efc4a4fdc..cb03a1e26 100644 --- a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java @@ -26,62 +26,62 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.List; +import java.util.Optional; -import org.apache.tika.config.Field; +import org.pf4j.Extension; + +import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.api.emitter.StreamEmitter; +import org.apache.tika.pipes.api.emitter.AbstractStreamEmitter; +import org.apache.tika.plugins.PluginConfig; +import org.apache.tika.plugins.PluginConfigs; import org.apache.tika.serialization.JsonMetadataList; +import org.apache.tika.utils.StringUtils; /** * Emitter to write to a file system. * <p> - * This calculates the path to write to based on the {@link #basePath} + * This calculates the path to write to based on the {@link FileSystemEmitterConfig#basePath()} * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value. * * <pre class="prettyprint"> - * <properties> - * <emitters> - * <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter> - * <params> - * <!-- required --> - * <param name="name" type="string">fs</param> - * <!-- required --> - * <param name="basePath" type="string">/path/to/output</param> - * <!-- optional; default is 'json' --> - * <param name="fileExtension" type="string">json</param> - * <!-- optional; if the file already exists, - * options ('skip', 'replace', 'exception') - * default is 'exception' --> - * <param name="onExists" type="string">skip</param> - * <!-- optional; whether or not to pretty print the output - * default is false --> - * <param name="prettyPrint" type="boolean">true</param> - * </params> - * </emitter> - * </emitters> - * </properties></pre> + * </pre> */ -public class FileSystemEmitter implements StreamEmitter { +@Extension +public class FileSystemEmitter extends AbstractStreamEmitter { + + private FileSystemEmitterConfig fileSystemEmitterConfig; - private Path basePath = null; - private String fileExtension = "json"; - private ON_EXISTS onExists = ON_EXISTS.EXCEPTION; + public FileSystemEmitter() throws IOException { + super(); + } - private boolean prettyPrint = false; + @Override + public void configure(PluginConfig pluginConfig) throws TikaConfigException, IOException { + checkPluginId(pluginConfig.pluginId()); + fileSystemEmitterConfig = FileSystemEmitterConfig.load(pluginConfig.jsonConfig()); + //checkConfig(fileSystemEmitterConfig); + } @Override public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException { - Path output; + if (metadataList == null || metadataList.isEmpty()) { throw new IOException("metadata list must not be null or of size 0"); } - if (fileExtension != null && ! fileExtension.isEmpty()) { - emitKey += "." + fileExtension; + FileSystemEmitterConfig config = getConfig(parseContext); + + Path output; + + if (!StringUtils.isBlank(config.fileExtension())) { + emitKey += "." + config.fileExtension(); } - if (basePath != null) { + + if (config.basePath() != null) { + Path basePath = Paths.get(config.basePath()); output = basePath.resolve(emitKey); if (!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize())) { throw new IOException("path traversal?! " + output.toAbsolutePath()); @@ -94,70 +94,36 @@ public class FileSystemEmitter implements StreamEmitter { Files.createDirectories(output.getParent()); } try (Writer writer = Files.newBufferedWriter(output, StandardCharsets.UTF_8)) { - JsonMetadataList.toJson(metadataList, writer, prettyPrint); + JsonMetadataList.toJson(metadataList, writer, config.prettyPrint()); } } - @Field - public void setBasePath(String basePath) { - this.basePath = Paths.get(basePath); - } - - /** - * If you want to customize the output file's file extension. - * Do not include the "." - * - * @param fileExtension - */ - @Field - public void setFileExtension(String fileExtension) { - this.fileExtension = fileExtension; - } + @Override + public void emit(String emitKey, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) throws IOException { + FileSystemEmitterConfig config = getConfig(parseContext); - /** - * What to do if the target file already exists. NOTE: if more than one - * thread is trying write to the same file and {@link ON_EXISTS#REPLACE} is chosen, - * you still might get a {@link FileAlreadyExistsException}. - * - * @param onExists - */ - @Field - public void setOnExists(String onExists) { - switch (onExists) { - case "skip": - this.onExists = ON_EXISTS.SKIP; - break; - case "replace": - this.onExists = ON_EXISTS.REPLACE; - break; - case "exception": - this.onExists = ON_EXISTS.EXCEPTION; - break; - default: - throw new IllegalArgumentException("Don't understand '" + onExists + "'; must be one of: 'skip', 'replace', 'exception'"); + Path output; + if (config.basePath() != null) { + Path basePath = Paths.get(config.basePath()); + output = basePath.resolve(emitKey); + if (!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize())) { + throw new IOException("path traversal?! " + output.toAbsolutePath()); + } + } else { + output = Paths.get(emitKey); } - } - @Field - public void setPrettyPrint(boolean prettyPrint) { - this.prettyPrint = prettyPrint; - } - - @Override - public void emit(String path, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) throws IOException { - Path target = basePath.resolve(path); - - if (!Files.isDirectory(target.getParent())) { - Files.createDirectories(target.getParent()); + if (!Files.isDirectory(output.getParent())) { + Files.createDirectories(output.getParent()); } - if (onExists == ON_EXISTS.REPLACE) { - Files.copy(inputStream, target, StandardCopyOption.REPLACE_EXISTING); - } else if (onExists == ON_EXISTS.EXCEPTION) { - Files.copy(inputStream, target); - } else if (onExists == ON_EXISTS.SKIP) { - if (!Files.isRegularFile(target)) { + if (config.onExists() == ON_EXISTS.REPLACE) { + Files.copy(inputStream, output, StandardCopyOption.REPLACE_EXISTING); + } else if (config.onExists() == ON_EXISTS.EXCEPTION) { + Files.copy(inputStream, output); + } else if (config.onExists() == ON_EXISTS.SKIP) { + if (!Files.isRegularFile(output)) { try { - Files.copy(inputStream, target); + Files.copy(inputStream, output); } catch (FileAlreadyExistsException e) { //swallow } @@ -165,7 +131,15 @@ public class FileSystemEmitter implements StreamEmitter { } } - enum ON_EXISTS { - SKIP, EXCEPTION, REPLACE + private FileSystemEmitterConfig getConfig(ParseContext parseContext) throws IOException { + FileSystemEmitterConfig config = fileSystemEmitterConfig; + PluginConfigs pluginConfigs = parseContext.get(PluginConfigs.class); + if (pluginConfigs != null) { + Optional<PluginConfig> pluginConfigOpt = pluginConfigs.get(getPluginId()); + if (pluginConfigOpt.isPresent()) { + config = FileSystemEmitterConfig.load(pluginConfigOpt.get().jsonConfig()); + } + } + return config; } } diff --git a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java new file mode 100644 index 000000000..9106d8b0c --- /dev/null +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java @@ -0,0 +1,20 @@ +package org.apache.tika.pipes.emitter.fs; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +enum ON_EXISTS { + SKIP, EXCEPTION, REPLACE +} + +public record FileSystemEmitterConfig(String basePath, String fileExtension, ON_EXISTS onExists, boolean prettyPrint) { + + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static FileSystemEmitterConfig load(String json) throws IOException { + return OBJECT_MAPPER.readValue(json, FileSystemEmitterConfig.class); + } + +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java similarity index 54% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java rename to tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java index 4d41cd7d6..e8bec02a5 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java @@ -14,38 +14,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.core.emitter.config; +package org.apache.tika.pipes.emitter.fs; -import org.apache.tika.pipes.api.emitter.EmitterConfig; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class DefaultEmitterConfig implements EmitterConfig { +public class FileSystemEmitterPlugin extends Plugin { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemEmitterPlugin.class); - private String plugId; - private String configJson; - - public DefaultEmitterConfig(String plugId, String configJson) { - this.plugId = plugId; - this.configJson = configJson; - } - @Override - public String getPluginId() { - return plugId; + public FileSystemEmitterPlugin(PluginWrapper wrapper) { + super(wrapper); } @Override - public EmitterConfig setPluginId(String pluginId) { - this.plugId = pluginId; - return this; + public void start() { + LOG.info("Starting"); + super.start(); } @Override - public String getConfigJson() { - return configJson; + public void stop() { + LOG.info("Stopping"); + super.stop(); } @Override - public EmitterConfig setConfigJson(String configJson) { - this.configJson = configJson; - return this; + public void delete() { + LOG.info("Deleting"); + super.delete(); } + } diff --git a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties similarity index 80% copy from tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties copy to tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties index b2488f75d..a85876524 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties +++ b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -plugin.id=file-system-fetcher -plugin.class=org.apache.tika.pipes.fetcher.fs.FileSystemFetcherPlugin +plugin.id=file-system-emitter +plugin.class=org.apache.tika.pipes.emitter.fs.FileSystemEmitterPlugin plugin.version=4.0.0-SNAPSHOT -plugin.provider=Local File System Fetcher -plugin.description=Capable of fetching the local file system +plugin.provider=Local File System Emitter +plugin.description=Capable of emitting the local file system diff --git a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java index 3e3df6b73..3f6d0ffde 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java @@ -25,6 +25,7 @@ import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.FileTime; import java.util.Date; +import java.util.Optional; import org.pf4j.Extension; import org.slf4j.Logger; @@ -39,8 +40,9 @@ import org.apache.tika.metadata.Property; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.fetcher.AbstractFetcher; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; import org.apache.tika.pipes.fetcher.fs.config.FileSystemFetcherConfig; +import org.apache.tika.plugins.PluginConfig; +import org.apache.tika.plugins.PluginConfigs; import org.apache.tika.utils.StringUtils; /** @@ -72,9 +74,9 @@ public class FileSystemFetcher extends AbstractFetcher { } @Override - public void configure(FetcherConfig fetcherConfig) throws IOException, TikaConfigException { - checkPluginId(fetcherConfig.getPluginId()); - defaultFileSystemFetcherConfig = FileSystemFetcherConfig.load(fetcherConfig.getConfigJson()); + public void configure(PluginConfig pluginConfig) throws IOException, TikaConfigException { + checkPluginId(pluginConfig.pluginId()); + defaultFileSystemFetcherConfig = FileSystemFetcherConfig.load(pluginConfig.jsonConfig()); checkConfig(defaultFileSystemFetcherConfig); } @@ -86,11 +88,15 @@ public class FileSystemFetcher extends AbstractFetcher { "a file name with this character in it."); } FileSystemFetcherConfig config = defaultFileSystemFetcherConfig; - FetcherConfig fetcherConfig = parseContext.get(FetcherConfig.class); - if (fetcherConfig != null) { - checkPluginId(fetcherConfig.getPluginId()); - config = FileSystemFetcherConfig.load(fetcherConfig.getConfigJson()); - checkConfig(config); + PluginConfigs pluginConfigManager = parseContext.get(PluginConfigs.class); + if (pluginConfigManager != null) { + Optional<PluginConfig> pluginConfigOpt = pluginConfigManager.get(getPluginId()); + if (pluginConfigOpt.isPresent()) { + PluginConfig pluginConfig = pluginConfigOpt.get(); + checkPluginId(pluginConfig.pluginId()); + config = FileSystemFetcherConfig.load(pluginConfig.jsonConfig()); + checkConfig(config); + } } Path p = null; if (! StringUtils.isBlank(config.getBasePath())) { diff --git a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/fetcher-plugin.properties similarity index 100% rename from tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties rename to tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/fetcher-plugin.properties diff --git a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java index 0abd34f48..e87372a23 100644 --- a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java +++ b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java @@ -26,7 +26,7 @@ import java.nio.file.Paths; import org.junit.jupiter.api.Test; import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import org.apache.tika.plugins.PluginConfig; public class FileSystemFetcherTest { @@ -49,27 +49,7 @@ public class FileSystemFetcherTest { public void testNullByte() throws Exception { FileSystemFetcher f = new FileSystemFetcher(); assertThrows(TikaConfigException.class, () -> { - f.configure(new FetcherConfig() { - @Override - public String getPluginId() { - return "blah"; - } - - @Override - public FetcherConfig setPluginId(String pluginId) { - return this; - } - - @Override - public String getConfigJson() { - return "{ \"basePath\":\"bad\u0000path\"}"; - } - - @Override - public FetcherConfig setConfigJson(String config) { - return this; - } - }); + f.configure(new PluginConfig("test", "{ \"basePath\":\"bad\u0000path\"}")); }); } } diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java similarity index 75% copy from tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java copy to tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java index f0554217d..4d5f73849 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java @@ -14,24 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.api.fetcher; +package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Properties; import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.api.fetcher.Fetcher; -public abstract class AbstractFetcher implements Fetcher { +public abstract class AbstractEmitter implements Emitter { private final String pluginId; - public AbstractFetcher() throws IOException { + public AbstractEmitter() throws IOException { Properties properties = new Properties(); - try (InputStream is = this.getClass().getResourceAsStream("/plugin.properties")) { + try (InputStream is = this.getClass().getResourceAsStream("/emitter-plugin.properties")) { properties.load(is); } pluginId = (String) properties.get("plugin.id"); - } @Override @@ -39,6 +40,14 @@ public abstract class AbstractFetcher implements Fetcher { return pluginId; } + @Override + public void emit(List<? extends EmitData> emitData) throws IOException { + for (EmitData item : emitData) { + emit(item.getEmitKey(), item.getMetadataList(), item.getParseContext()); + } + } + + protected void checkPluginId(String pluginId) throws TikaConfigException { if (! getPluginId().equals(pluginId)) { throw new TikaConfigException("Plugin id mismatch: " + getPluginId() + " <> " + pluginId); diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java similarity index 76% copy from tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java copy to tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java index f0554217d..0f12535d3 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java @@ -14,24 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tika.pipes.api.fetcher; +package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Properties; import org.apache.tika.exception.TikaConfigException; -public abstract class AbstractFetcher implements Fetcher { +public abstract class AbstractStreamEmitter implements StreamEmitter { private final String pluginId; - public AbstractFetcher() throws IOException { + public AbstractStreamEmitter() throws IOException { Properties properties = new Properties(); - try (InputStream is = this.getClass().getResourceAsStream("/plugin.properties")) { + try (InputStream is = this.getClass().getResourceAsStream("/emitter-plugin.properties")) { properties.load(is); } pluginId = (String) properties.get("plugin.id"); - } @Override @@ -44,4 +44,11 @@ public abstract class AbstractFetcher implements Fetcher { throw new TikaConfigException("Plugin id mismatch: " + getPluginId() + " <> " + pluginId); } } + + @Override + public void emit(List<? extends EmitData> emitData) throws IOException { + for (EmitData item : emitData) { + emit(item.getEmitKey(), item.getMetadataList(), item.getParseContext()); + } + } } diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java new file mode 100644 index 000000000..3c1b09dbb --- /dev/null +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java @@ -0,0 +1,19 @@ +package org.apache.tika.pipes.api.emitter; + +import java.util.List; + +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.ParseContext; + +public interface EmitData { + String getEmitKey(); + + List<Metadata> getMetadataList(); + + String getContainerStackTrace(); + + long getEstimatedSizeBytes(); + + ParseContext getParseContext(); + +} diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java index aa7122a50..dad43467a 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java @@ -19,18 +19,23 @@ package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.util.List; +import org.pf4j.ExtensionPoint; + import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.plugins.PluginConfig; -public interface Emitter { +public interface Emitter extends ExtensionPoint { - void configure(EmitterConfig emitterConfig) throws TikaConfigException, IOException; + void configure(PluginConfig pluginConfig) throws TikaConfigException, IOException; String getPluginId(); void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException; + void emit(List<? extends EmitData> emitData) throws IOException; + //TODO -- add this later for xhtml? //void emit(String txt, Metadata metadata) throws IOException, TikaException; diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java index f0554217d..6d796b584 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java @@ -25,13 +25,13 @@ import org.apache.tika.exception.TikaConfigException; public abstract class AbstractFetcher implements Fetcher { private final String pluginId; + public AbstractFetcher() throws IOException { Properties properties = new Properties(); - try (InputStream is = this.getClass().getResourceAsStream("/plugin.properties")) { + try (InputStream is = this.getClass().getResourceAsStream("/fetcher-plugin.properties")) { properties.load(is); } pluginId = (String) properties.get("plugin.id"); - } @Override diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java index c4b523fd1..07469022e 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java @@ -25,6 +25,7 @@ import org.apache.tika.exception.TikaConfigException; import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.plugins.PluginConfig; /** * Interface for an object that will fetch an InputStream given @@ -35,7 +36,7 @@ import org.apache.tika.parser.ParseContext; */ public interface Fetcher extends ExtensionPoint { - void configure(FetcherConfig fetcherConfig) throws TikaConfigException, IOException; + void configure(PluginConfig fetcherConfig) throws TikaConfigException, IOException; String getPluginId(); diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java deleted file mode 100644 index 2c5e75811..000000000 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.pipes.api.fetcher; - -import java.io.Serializable; - -public interface FetcherConfig extends Serializable { - - String getPluginId(); - FetcherConfig setPluginId(String pluginId); - String getConfigJson(); - FetcherConfig setConfigJson(String config); -} diff --git a/tika-pipes/tika-pipes-core-tests/pom.xml b/tika-pipes/tika-pipes-core-tests/pom.xml index c8b531526..c6a8a91de 100644 --- a/tika-pipes/tika-pipes-core-tests/pom.xml +++ b/tika-pipes/tika-pipes-core-tests/pom.xml @@ -44,6 +44,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-emitter-file-system</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>${project.groupId}</groupId> <artifactId>tika-fetcher-file-system</artifactId> diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java index 3bf377855..a4eea1ce5 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -47,23 +48,19 @@ public class PassbackFilterTest { private Path tmpDir; String fetcherPluginId = "file-system-fetcher"; + String emitterPluginId = "file-system-emitter"; String testPdfFile = "testOverlappingText.pdf"; private PipesClient pipesClient; @BeforeEach public void init() throws Exception { - Path tikaConfigTemplate = Paths.get("src", "test", "resources", "org", "apache", "tika", "pipes", "core", "tika-emit-config.xml"); + Path tikaConfig = Paths.get("src", "test", "resources", "org", "apache", "tika", "pipes", "core", "tika-emit-config.xml"); tmpDir = Files.createTempDirectory("tika-pipes"); Path pipesConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(tmpDir); Path tikaConfigPath = Files.createTempFile(tmpDir, "tika-pipes-", ".xml"); - String template = Files.readString(tikaConfigTemplate, StandardCharsets.UTF_8); - template = template.replace("EMITTER_BASE_PATH", tmpDir - .toAbsolutePath() - .toString()); - Files.writeString(tikaConfigPath, template); - + Files.copy(tikaConfig, tikaConfigPath, StandardCopyOption.REPLACE_EXISTING); PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath, pipesConfigPath); PluginsTestHelper.copyTestFilesToTmpInput(tmpDir, testPdfFile); @@ -81,7 +78,8 @@ public class PassbackFilterTest { ParseContext parseContext = new ParseContext(); parseContext.set(PassbackFilter.class, new MyPassbackFilter()); PipesResult pipesResult = pipesClient.process( - new FetchEmitTuple(testPdfFile, new FetchKey(fetcherPluginId, testPdfFile), new EmitKey("fs", emitFileBase), new Metadata(), parseContext, + new FetchEmitTuple(testPdfFile, new FetchKey(fetcherPluginId, testPdfFile), + new EmitKey(emitterPluginId, emitFileBase), new Metadata(), parseContext, FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP)); assertEquals(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK, pipesResult.getStatus()); Assertions.assertNotNull(pipesResult @@ -100,7 +98,7 @@ public class PassbackFilterTest { assertNull(metadata.get(Metadata.CONTENT_LENGTH)); assertEquals(1, metadata.names().length); - List<Metadata> metadataList = JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve(emitFileBase + ".json"), StandardCharsets.UTF_8)); + List<Metadata> metadataList = JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve("output").resolve(emitFileBase + ".json"), StandardCharsets.UTF_8)); assertEquals(1, metadataList.size()); assertEquals("application/pdf", metadataList .get(0) diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java index 625710810..d89b479e1 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java @@ -25,19 +25,25 @@ import java.nio.file.Paths; public class PluginsTestHelper { public static Path getFileSystemFetcherConfig(Path configBase) throws Exception { - return getFileSystemFetcherConfig(configBase, configBase.resolve("input")); + return getFileSystemFetcherConfig(configBase, configBase.resolve("input"), configBase.resolve("output")); } - public static Path getFileSystemFetcherConfig(Path configBase, Path filesBase) throws Exception { + public static Path getFileSystemFetcherConfig(Path configBase, Path fetcherBase, Path emitterBase) throws Exception { Path pipesConfig = configBase.resolve("pipes-config.json"); - Path tikaPluginsTemplate = Paths.get("src", "test", "resources", "configs", "fetchers.json"); + Path tikaPluginsTemplate = Paths.get("src", "test", "resources", "configs", "fetchers-emitters.json"); String json = Files.readString(tikaPluginsTemplate, StandardCharsets.UTF_8); - json = json.replace("BASE_PATH", filesBase + json = json.replace("FETCHERS_BASE_PATH", fetcherBase .toAbsolutePath() .toString()); + if (emitterBase != null) { + json = json.replace("EMITTERS_BASE_PATH", emitterBase + .toAbsolutePath() + .toString()); + } + Files.write(pipesConfig, json.getBytes(StandardCharsets.UTF_8)); return pipesConfig; } diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java index 02bb42c06..6f049652e 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import org.apache.tika.config.AbstractTikaConfigTest; import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; @@ -63,15 +64,15 @@ public class TikaPipesConfigTest extends AbstractTikaConfigTest { FetcherManager fetcherManager = FetcherManager.load( getConfigFilePath("fetchers-nobasepath-config.xml")); - }*/ + } @Test public void testEmitters() throws Exception { EmitterManager emitterManager = EmitterManager.load(getConfigFilePath("emitters-config.xml")); - Emitter em1 = emitterManager.getEmitter("em1"); + Emitter em1 = emitterManager.getEmitter("file-system-emitter-1"); assertNotNull(em1); - Emitter em2 = emitterManager.getEmitter("em2"); + Emitter em2 = emitterManager.getEmitter("file-system-emitter-2"); assertNotNull(em2); } @@ -81,7 +82,7 @@ public class TikaPipesConfigTest extends AbstractTikaConfigTest { EmitterManager.load(getConfigFilePath("emitters-duplicate-config.xml")); }); } - +*/ @Test public void testPipesIterator() throws Exception { PipesIterator it = diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java index eb836c93d..81ee817b3 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java @@ -30,11 +30,12 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.pipes.core.FetchEmitTuple; import org.apache.tika.pipes.core.PipesResult; import org.apache.tika.pipes.core.PluginsTestHelper; -import org.apache.tika.pipes.core.emitter.EmitData; import org.apache.tika.pipes.core.emitter.EmitKey; +import org.apache.tika.pipes.core.emitter.MockEmitter; import org.apache.tika.pipes.core.fetcher.FetchKey; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; @@ -82,10 +83,7 @@ public class AsyncChaosMonkeyTest { crash = 0; Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml"); String xml = - "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + " <emitters>" + - " <emitter class=\"org.apache.tika.pipes.core.async.MockEmitter\">\n" + - " <name>mock</name>\n" + " </emitter>" + - " </emitters>" + + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + " <autoDetectParserConfig>\n" + " <digesterFactory\n" + " class=\"org.apache.tika.pipes.core.async.MockDigesterFactory\"/>\n" + @@ -120,7 +118,7 @@ public class AsyncChaosMonkeyTest { } MockEmitter.EMIT_DATA.clear(); MockReporter.RESULTS.clear(); - pipesPluginsConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(configDir, inputDir); + pipesPluginsConfigPath = PluginsTestHelper.getFileSystemFetcherConfig(configDir, inputDir, null); return tikaConfigPath; } @@ -143,7 +141,7 @@ public class AsyncChaosMonkeyTest { for (int i = 0; i < totalFiles; i++) { FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey(fetcherPluginId, i + ".xml"), - new EmitKey("mock", "emit-" + i), new Metadata()); + new EmitKey("mock-emitter", "emit-" + i), new Metadata()); processor.offer(t, 1000); } for (int i = 0; i < 10; i++) { @@ -156,7 +154,7 @@ public class AsyncChaosMonkeyTest { processor.close(); Set<String> emitKeys = new HashSet<>(); for (EmitData d : MockEmitter.EMIT_DATA) { - emitKeys.add(d.getEmitKey().getEmitKey()); + emitKeys.add(d.getEmitKey()); } assertEquals(ok, emitKeys.size()); assertEquals(100, MockReporter.RESULTS.size()); @@ -171,7 +169,7 @@ public class AsyncChaosMonkeyTest { AsyncProcessor processor = new AsyncProcessor(setUp(true), pipesPluginsConfigPath); for (int i = 0; i < totalFiles; i++) { FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey(fetcherPluginId, i + ".xml"), - new EmitKey("mock", "emit-" + i), new Metadata()); + new EmitKey("mock-emitter", "emit-" + i), new Metadata()); processor.offer(t, 1000); } for (int i = 0; i < 10; i++) { @@ -185,7 +183,7 @@ public class AsyncChaosMonkeyTest { Set<String> emitKeys = new HashSet<>(); int observedOOM = 0; for (EmitData d : MockEmitter.EMIT_DATA) { - emitKeys.add(d.getEmitKey().getEmitKey()); + emitKeys.add(d.getEmitKey()); assertEquals(64, d.getMetadataList().get(0).get("X-TIKA:digest:SHA-256").trim().length()); assertEquals("application/mock+xml", diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java deleted file mode 100644 index fab797ed1..000000000 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.pipes.core.async; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; - -import org.apache.tika.metadata.Metadata; -import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.EmitData; -import org.apache.tika.pipes.core.emitter.EmitKey; -import org.apache.tika.pipes.core.emitter.TikaEmitterException; - -public class MockEmitter extends AbstractEmitter { - - static ArrayBlockingQueue<EmitData> EMIT_DATA = new ArrayBlockingQueue<>(10000); - - public MockEmitter() { - } - - public static List<EmitData> getData() { - return new ArrayList<>(EMIT_DATA); - } - - @Override - public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) - throws IOException, TikaEmitterException { - emit( - Collections.singletonList(new EmitData(new EmitKey(getName(), emitKey), - metadataList, null, parseContext))); - } - - @Override - public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { - int inserted = 0; - for (EmitData d : emitData) { - EMIT_DATA.offer(d); - } - } - -} diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java index 2b869374c..a6f05cbbd 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java @@ -26,7 +26,7 @@ import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.fetcher.AbstractFetcher; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import org.apache.tika.plugins.PluginConfig; public class MockFetcher extends AbstractFetcher { @@ -39,7 +39,7 @@ public class MockFetcher extends AbstractFetcher { } @Override - public void configure(FetcherConfig fetcherConfig) throws TikaConfigException, IOException { + public void configure(PluginConfig fetcherConfig) throws TikaConfigException, IOException { //no-op } diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java index 6d32ea2c4..86fa68250 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java @@ -17,44 +17,80 @@ package org.apache.tika.pipes.core.emitter; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.pf4j.Extension; -import org.apache.tika.config.Field; import org.apache.tika.config.Initializable; import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.config.Param; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.emitter.EmitData; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.plugins.PluginConfig; -public class MockEmitter extends AbstractEmitter implements Initializable { +@Extension +public class MockEmitter implements Initializable, Emitter { - @Field - private boolean throwOnCheck = false; + public static ArrayBlockingQueue<EmitData> EMIT_DATA = new ArrayBlockingQueue<>(10000); - @Override - public void initialize(Map<String, Param> params) throws TikaConfigException { + public static List<EmitData> getData() { + return new ArrayList<>(EMIT_DATA); + } + + public MockEmitter() throws IOException { + } + + private static record MockEmitterConfig(boolean throwOnCheck) { } - public void setThrowOnCheck(boolean throwOnCheck) { - this.throwOnCheck = throwOnCheck; + private MockEmitterConfig config = new MockEmitterConfig(true); + + @Override + public void initialize(Map<String, Param> params) throws TikaConfigException { + //no-op } @Override public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException { - if (throwOnCheck) { + if (config.throwOnCheck()) { throw new TikaConfigException("throw on check"); } } + @Override + public void configure(PluginConfig pluginConfig) throws TikaConfigException, IOException { + config = new ObjectMapper().readValue(pluginConfig.jsonConfig(), MockEmitterConfig.class); + } + + @Override + public String getPluginId() { + return "mock-emitter"; + } + @Override public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException { + emit( + Collections.singletonList(new EmitDataImpl(emitKey, + metadataList, null, parseContext))); + } + @Override + public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { + for (EmitData d : emitData) { + EMIT_DATA.offer(d); + } } } diff --git a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java index 68955d69b..9741d35d5 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java +++ b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java @@ -29,7 +29,7 @@ import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.fetcher.AbstractFetcher; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import org.apache.tika.plugins.PluginConfig; public class MockFetcher extends AbstractFetcher { @@ -55,7 +55,7 @@ public class MockFetcher extends AbstractFetcher { } @Override - public void configure(FetcherConfig fetcherConfig) throws TikaConfigException, IOException { + public void configure(PluginConfig fetcherConfig) throws TikaConfigException, IOException { //no-op } diff --git a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json new file mode 100644 index 000000000..3e2409761 --- /dev/null +++ b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json @@ -0,0 +1,16 @@ +{ + "plugins" : { + "fetchers": { + "file-system-fetcher": { + "basePath": "FETCHERS_BASE_PATH", + "extractFileSystemMetadata": false + } + }, + "emitters": { + "file-system-emitter": { + "basePath": "EMITTERS_BASE_PATH", + "fileExtension": "json" + } + } + } +} \ No newline at end of file diff --git a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json deleted file mode 100644 index 5a46c0444..000000000 --- a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "pipesPluginsConfig" : { - "fetchers": { - "file-system-fetcher": { - "basePath": "BASE_PATH", - "extractFileSystemMetadata": false - } - } - } -} \ No newline at end of file diff --git a/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml b/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml index 58e168233..69d72bf02 100644 --- a/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml +++ b/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml @@ -32,16 +32,4 @@ <skipContainerDocument>false</skipContainerDocument> </digesterFactory> </autoDetectParserConfig> -<!-- <fetchers> - <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher"> - <name>fs</name> - <basePath>src/test/resources/test-documents</basePath> - </fetcher> - </fetchers> --> - <emitters> - <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter"> - <name>fs</name> - <basePath>EMITTER_BASE_PATH</basePath> - </emitter> - </emitters> </properties> \ No newline at end of file diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index 11fffa7fb..cb64a4f09 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -51,7 +51,7 @@ import org.apache.tika.config.TikaTaskTimeout; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitKey; import org.apache.tika.utils.ProcessUtils; import org.apache.tika.utils.StringUtils; @@ -376,7 +376,7 @@ public class PipesClient implements Closeable { input.readFully(bytes); try (ObjectInputStream objectInputStream = new ObjectInputStream( UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) { - EmitData emitData = (EmitData) objectInputStream.readObject(); + EmitDataImpl emitData = (EmitDataImpl) objectInputStream.readObject(); String stack = emitData.getContainerStackTrace(); if (StringUtils.isBlank(stack)) { @@ -405,7 +405,7 @@ public class PipesClient implements Closeable { try (ObjectInputStream objectInputStream = new ObjectInputStream( UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) { Metadata metadata = (Metadata) objectInputStream.readObject(); - EmitData emitDataTuple = new EmitData(emitKey, Collections.singletonList(metadata)); + EmitDataImpl emitDataTuple = new EmitDataImpl(emitKey.getEmitKey(), Collections.singletonList(metadata)); return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitDataTuple, true); } catch (ClassNotFoundException e) { LOG.error("class not found exception deserializing data", e); diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java index 3ef3a9ec0..6fa63b7a6 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java @@ -23,69 +23,102 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Optional; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.tika.pipes.api.emitter.EmitterConfig; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; -import org.apache.tika.pipes.core.fetcher.config.DefaultFetcherConfig; +import org.apache.tika.plugins.PluginConfig; +import org.apache.tika.plugins.PluginConfigs; + public class PipesPluginsConfig { public static PipesPluginsConfig load(InputStream is) throws IOException { JsonNode root = new ObjectMapper().readTree(new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))); - JsonNode plugins = root.get("pipesPluginsConfig"); - Map<String, FetcherConfig> fetcherMap = new HashMap<>(); + PluginConfigs fetchers = null; + PluginConfigs emitters = null; + PluginConfigs iterators = null; + PluginConfigs reporters = null; + + JsonNode plugins = root.get("plugins"); if (plugins.has("fetchers")) { - JsonNode fetchers = plugins.get("fetchers"); - Iterator<String> it = fetchers.fieldNames(); - while (it.hasNext()) { - String pluginId = it.next(); - JsonNode fetcherConfig = fetchers.get(pluginId); - fetcherMap.put(pluginId, new DefaultFetcherConfig(pluginId, fetcherConfig.toString())); - } + fetchers = load(plugins.get("fetchers")); } - Map<String, FetcherConfig> emitterMap = new HashMap<>(); if (plugins.has("emitters")) { - JsonNode emitters = plugins.get("emitters"); - Iterator<String> it = emitters.fieldNames(); - while (it.hasNext()) { - String pluginId = it.next(); - JsonNode emitterConfig = emitters.get(pluginId); - emitterMap.put(pluginId, new EmitterConfigImpl(pluginId, emitterConfig.toString())); - } + emitters = load(plugins.get("emitters")); + } + if (plugins.has("iterators")) { + iterators = load(plugins.get("iterators")); + } + if (plugins.has("reporters")) { + reporters = load(plugins.get("reporters")); } + Path pluginsDir = null; if (plugins.has("pf4j.pluginsDir")) { pluginsDir = Paths.get(plugins.get("pf4j.pluginsDir").asText()); } - return new PipesPluginsConfig(fetcherMap, emitterMap, pluginsDir); + return new PipesPluginsConfig(fetchers, emitters, iterators, reporters, pluginsDir); + } + + private static PluginConfigs load(JsonNode pluginsNode) { + PluginConfigs manager = new PluginConfigs(); + Iterator<String> it = pluginsNode.fieldNames(); + manager = new PluginConfigs(); + while (it.hasNext()) { + String pluginId = it.next(); + JsonNode jsonConfig = pluginsNode.get(pluginId); + manager.add(new PluginConfig(pluginId, jsonConfig.toString())); + } + return manager; } - private final Map<String, FetcherConfig> fetcherMap; - private final Map<String, EmitterConfig> emitterMap; + private final PluginConfigs fetchers; + private final PluginConfigs emitters; + private final PluginConfigs iterators; + private final PluginConfigs reporters; private final Path pluginsDir; - private PipesPluginsConfig(Map<String, FetcherConfig> fetcherMap, Map<String, EmitterConfig> emitterMap, Path pluginsDir) { - this.fetcherMap = fetcherMap; - this.emitterMap = emitterMap; + + public PipesPluginsConfig(PluginConfigs fetchers, PluginConfigs emitters, + PluginConfigs iterators, PluginConfigs reporters, Path pluginsDir) { + this.fetchers = fetchers; + this.emitters = emitters; + this.iterators = iterators; + this.reporters = reporters; this.pluginsDir = pluginsDir; } - public Optional<FetcherConfig> getFetcherConfig(String pluginId) { - return Optional.ofNullable(fetcherMap.get(pluginId)); + public Optional<PluginConfig> getFetcherConfig(String pluginId) { + if (fetchers == null) { + throw new IllegalArgumentException("fetchers element was not loaded"); + } + return fetchers.get(pluginId); + } + + public Optional<PluginConfig> getEmitterConfig(String pluginId) { + if (emitters == null) { + throw new IllegalArgumentException("emitters element was not loaded"); + } + return emitters.get(pluginId); } - public Optional<EmitterConfig> getEmitterConfig(String pluginId) { - return Optional.ofNullable(emitterMap.get(pluginId)); + public Optional<PluginConfig> getIteratorConfig(String pluginId) { + if (iterators == null) { + throw new IllegalArgumentException("iterators element was not loaded"); + } + return iterators.get(pluginId); } + public Optional<PluginConfig> getReporterConfig(String pluginId) { + if (reporters == null) { + throw new IllegalArgumentException("reporters element was not loaded"); + } + return reporters.get(pluginId); + } public Optional<Path> getPluginsDir() { return Optional.ofNullable(pluginsDir); diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java index 3f391ae28..d9ad9f0be 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java @@ -16,7 +16,7 @@ */ package org.apache.tika.pipes.core; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; public class PipesResult { @@ -49,10 +49,10 @@ public class PipesResult { public static final PipesResult EMPTY_OUTPUT = new PipesResult(STATUS.EMPTY_OUTPUT); private final STATUS status; - private final EmitData emitDataTuple; + private final EmitDataImpl emitDataTuple; private final String message; - public PipesResult(STATUS status, EmitData emitDataTuple, String message, boolean intermediate) { + public PipesResult(STATUS status, EmitDataImpl emitDataTuple, String message, boolean intermediate) { this.status = status; this.emitDataTuple = emitDataTuple; this.message = message; @@ -72,11 +72,11 @@ public class PipesResult { * * @param emitDataTuple */ - public PipesResult(EmitData emitDataTuple) { + public PipesResult(EmitDataImpl emitDataTuple) { this(STATUS.PARSE_SUCCESS, emitDataTuple, null, false); } - public PipesResult(STATUS status, EmitData emitDataTuple, boolean intermediate) { + public PipesResult(STATUS status, EmitDataImpl emitDataTuple, boolean intermediate) { this(status, emitDataTuple, null, intermediate); } @@ -87,7 +87,7 @@ public class PipesResult { * @param emitDataTuple * @param message */ - public PipesResult(EmitData emitDataTuple, String message) { + public PipesResult(EmitDataImpl emitDataTuple, String message) { this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitDataTuple, message, false); } @@ -95,7 +95,7 @@ public class PipesResult { return status; } - public EmitData getEmitData() { + public EmitDataImpl getEmitData() { return emitDataTuple; } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java index b7429b9a5..ff520877b 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java @@ -69,10 +69,9 @@ import org.apache.tika.parser.RecursiveParserWrapper; import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.api.emitter.StreamEmitter; import org.apache.tika.pipes.api.fetcher.Fetcher; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.EmitterManager; -import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.pipes.core.extractor.BasicEmbeddedDocumentBytesHandler; import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig; import org.apache.tika.pipes.core.extractor.EmittingEmbeddedDocumentBytesHandler; @@ -309,7 +308,7 @@ public class PipesServer implements Runnable { try { emitter = emitterManager.getEmitter(emitKey.getEmitterPluginId()); } catch (IllegalArgumentException e) { - String noEmitterMsg = getNoEmitterMsg(taskId); + String noEmitterMsg = getNoEmitterMsg(taskId, emitKey.getEmitterPluginId()); LOG.warn(noEmitterMsg); write(STATUS.EMITTER_NOT_FOUND, noEmitterMsg); return; @@ -354,7 +353,7 @@ public class PipesServer implements Runnable { exit(1); } - EmitData filteredEmitDataTuple = new EmitData(emitKey, filtered, parseExceptionStack); + EmitDataImpl filteredEmitDataTuple = new EmitDataImpl(emitKey.getEmitKey(), filtered, parseExceptionStack); try { UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); @@ -490,7 +489,7 @@ public class PipesServer implements Runnable { emitKey = new EmitKey(emitKey.getEmitterPluginId(), t.getFetchKey().getFetchKey()); t.setEmitKey(emitKey); } - EmitData emitDataTuple = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack); + EmitDataImpl emitDataTuple = new EmitDataImpl(t.getEmitKey().getEmitKey(), parseData.getMetadataList(), stack); if (shouldEmit(embeddedDocumentBytesConfig, parseData, emitDataTuple)) { emit(t.getId(), emitKey, embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(), parseData, stack, parseContext); @@ -506,7 +505,7 @@ public class PipesServer implements Runnable { } } - private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitData emitDataTuple) { + private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitDataImpl emitDataTuple) { if (emitStrategy == EMIT_STRATEGY.EMIT_ALL) { return true; } else if (embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() && @@ -597,9 +596,9 @@ public class PipesServer implements Runnable { return sb.toString(); } - private String getNoEmitterMsg(String emitterName) { + private String getNoEmitterMsg(String taskName, String emitterName) { StringBuilder sb = new StringBuilder(); - sb.append("Emitter '").append(emitterName).append("'"); + sb.append("Emitter for task='").append(taskName).append("' emitter='").append(emitterName).append("'"); sb.append(" not found."); sb.append("\nThe configured emitterManager supports:"); int i = 0; @@ -853,7 +852,7 @@ public class PipesServer implements Runnable { //skip initialization of the emitters if emitting //from the pipesserver is turned off. if (maxForEmitBatchBytes > -1) { - this.emitterManager = EmitterManager.load(tikaConfigPath); + this.emitterManager = EmitterManager.load(pipesConfigPath); } else { LOG.debug("'maxForEmitBatchBytes' < 0. Not initializing emitters in PipesServer"); this.emitterManager = null; @@ -893,7 +892,7 @@ public class PipesServer implements Runnable { } } - private void write(EmitData emitData) { + private void write(EmitDataImpl emitData) { try { UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java index 63e1f8fc3..a985f6ee4 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java @@ -30,10 +30,10 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.pipes.api.emitter.Emitter; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitterManager; -import org.apache.tika.pipes.core.emitter.TikaEmitterException; import org.apache.tika.utils.ExceptionUtils; /** @@ -42,18 +42,18 @@ import org.apache.tika.utils.ExceptionUtils; */ public class AsyncEmitter implements Callable<Integer> { - static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null, null); + static final EmitDataPair EMIT_DATA_STOP_SEMAPHORE = new EmitDataPair(null, null); static final int EMITTER_FUTURE_CODE = 2; private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class); private final AsyncConfig asyncConfig; private final EmitterManager emitterManager; - private final ArrayBlockingQueue<EmitData> emitDataQueue; + private final ArrayBlockingQueue<EmitDataPair> emitDataQueue; Instant lastEmitted = Instant.now(); - public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitData> emitData, + public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitDataPair> emitData, EmitterManager emitterManager) { this.asyncConfig = asyncConfig; this.emitDataQueue = emitData; @@ -65,14 +65,14 @@ public class AsyncEmitter implements Callable<Integer> { EmitDataCache cache = new EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes()); while (true) { - EmitData emitData = emitDataQueue.poll(500, TimeUnit.MILLISECONDS); - if (emitData == EMIT_DATA_STOP_SEMAPHORE) { + EmitDataPair emitDataPair = emitDataQueue.poll(500, TimeUnit.MILLISECONDS); + if (emitDataPair == EMIT_DATA_STOP_SEMAPHORE) { cache.emitAll(); return EMITTER_FUTURE_CODE; } - if (emitData != null) { + if (emitDataPair != null) { //this can block on emitAll - cache.add(emitData); + cache.add(emitDataPair); } else { LOG.trace("Nothing on the async queue"); } @@ -102,17 +102,17 @@ public class AsyncEmitter implements Callable<Integer> { estimatedSize += newBytes; } - void add(EmitData data) { + void add(EmitDataPair emitDataPair) { size++; - long sz = data.getEstimatedSizeBytes(); + long sz = emitDataPair.emitData().getEstimatedSizeBytes(); if (estimatedSize + sz > maxBytes) { LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll", (estimatedSize + sz), maxBytes); emitAll(); } - List<EmitData> cached = map.computeIfAbsent(data.getEmitKey().getEmitterPluginId(), k -> new ArrayList<>()); + List<EmitData> cached = map.computeIfAbsent(emitDataPair.emitterPluginId(), k -> new ArrayList<>()); updateEstimatedSize(sz); - cached.add(data); + cached.add(emitDataPair.emitData()); } private void emitAll() { @@ -131,10 +131,10 @@ public class AsyncEmitter implements Callable<Integer> { lastEmitted = Instant.now(); } - private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitDatumTuples) { + private void tryToEmit(Emitter emitter, List<? extends EmitData> emitData) { try { - emitter.emit(cachedEmitDatumTuples); + emitter.emit(emitData); } catch (IOException e) { LOG.warn("emitter class ({}): {}", emitter.getClass(), ExceptionUtils.getStackTrace(e)); diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java index e57be5621..2c15d05ef 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java @@ -39,7 +39,7 @@ import org.apache.tika.pipes.core.PipesClient; import org.apache.tika.pipes.core.PipesException; import org.apache.tika.pipes.core.PipesReporter; import org.apache.tika.pipes.core.PipesResult; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; import org.apache.tika.pipes.core.pipesiterator.TotalCountResult; @@ -58,7 +58,7 @@ public class AsyncProcessor implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class); private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples; - private final ArrayBlockingQueue<EmitData> emitDatumTuples; + private final ArrayBlockingQueue<EmitDataPair> emitDatumTuples; private final ExecutorCompletionService<Integer> executorCompletionService; private final ExecutorService executorService; private final AsyncConfig asyncConfig; @@ -110,7 +110,7 @@ public class AsyncProcessor implements Closeable { new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitDatumTuples)); } - EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); + EmitterManager emitterManager = EmitterManager.load(asyncConfig.getPipesPluginsConfig()); for (int i = 0; i < asyncConfig.getNumEmitters(); i++) { executorCompletionService.submit( new AsyncEmitter(asyncConfig, emitDatumTuples, emitterManager)); @@ -262,11 +262,11 @@ public class AsyncProcessor implements Closeable { private final AsyncConfig asyncConfig; private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples; - private final ArrayBlockingQueue<EmitData> emitDataTupleQueue; + private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue; private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples, - ArrayBlockingQueue<EmitData> emitDataTupleQueue) { + ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue) { this.asyncConfig = asyncConfig; this.fetchEmitTuples = fetchEmitTuples; this.emitDataTupleQueue = emitDataTupleQueue; @@ -305,8 +305,8 @@ public class AsyncProcessor implements Closeable { if (shouldEmit(result)) { LOG.trace("adding result to emitter queue: " + result.getEmitData()); - boolean offered = emitDataTupleQueue.offer(result.getEmitData(), - MAX_OFFER_WAIT_MS, + boolean offered = emitDataTupleQueue.offer( + new EmitDataPair(t.getEmitKey().getEmitterPluginId(), result.getEmitData()), MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS); if (! offered) { throw new RuntimeException("Couldn't offer emit data to queue " + diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java new file mode 100644 index 000000000..e51152058 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java @@ -0,0 +1,6 @@ +package org.apache.tika.pipes.core.async; + +import org.apache.tika.pipes.api.emitter.EmitData; + +public record EmitDataPair(String emitterPluginId, EmitData emitData) { +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java similarity index 83% rename from tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java rename to tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java index 4c9996d12..1aee991f1 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java @@ -21,28 +21,29 @@ import java.util.List; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.utils.StringUtils; -public class EmitData implements Serializable { +public class EmitDataImpl implements Serializable, EmitData { /** * Serial version UID */ private static final long serialVersionUID = -3861669115439125268L; - private final EmitKey emitKey; + private final String emitKey; private final List<Metadata> metadataList; private final String containerStackTrace; private ParseContext parseContext = null; - public EmitData(EmitKey emitKey, List<Metadata> metadataList) { + public EmitDataImpl(String emitKey, List<Metadata> metadataList) { this(emitKey, metadataList, StringUtils.EMPTY); } - public EmitData(EmitKey emitKey, List<Metadata> metadataList, String containerStackTrace) { + public EmitDataImpl(String emitKey, List<Metadata> metadataList, String containerStackTrace) { this(emitKey, metadataList, containerStackTrace, new ParseContext()); } - public EmitData(EmitKey emitKey, List<Metadata> metadataList, String containerStackTrace, ParseContext parseContext) { + public EmitDataImpl(String emitKey, List<Metadata> metadataList, String containerStackTrace, ParseContext parseContext) { this.emitKey = emitKey; this.metadataList = metadataList; this.containerStackTrace = (containerStackTrace == null) ? StringUtils.EMPTY : @@ -50,7 +51,7 @@ public class EmitData implements Serializable { this.parseContext = parseContext; } - public EmitKey getEmitKey() { + public String getEmitKey() { return emitKey; } @@ -63,7 +64,7 @@ public class EmitData implements Serializable { } public long getEstimatedSizeBytes() { - return estimateSizeInBytes(getEmitKey().getEmitKey(), getMetadataList(), containerStackTrace); + return estimateSizeInBytes(getEmitKey(), getMetadataList(), containerStackTrace); } public void setParseContext(ParseContext parseContext) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java index 4dcfcc8fa..308c56bc2 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -29,14 +28,16 @@ import java.util.concurrent.ConcurrentHashMap; import org.pf4j.DefaultPluginManager; import org.pf4j.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.tika.config.ConfigBase; import org.apache.tika.config.Initializable; import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.pipes.api.emitter.Emitter; -import org.apache.tika.pipes.api.emitter.EmitterConfig; import org.apache.tika.pipes.core.PipesPluginsConfig; +import org.apache.tika.plugins.PluginConfig; /** * Utility class that will apply the appropriate emitter @@ -44,7 +45,10 @@ import org.apache.tika.pipes.core.PipesPluginsConfig; * <p> * This does not allow multiple emitters supporting the same prefix. */ -public class EmitterManager extends ConfigBase { +public class EmitterManager { + + private static final Logger LOG = LoggerFactory.getLogger(EmitterManager.class); + private final Map<String, Emitter> emitterMap = new ConcurrentHashMap<>(); @@ -66,7 +70,8 @@ public class EmitterManager extends ConfigBase { pluginManager.startPlugins(); Map<String, Emitter> emitterMap = new HashMap<>(); for (Emitter emitter : pluginManager.getExtensions(Emitter.class)) { - Optional<EmitterConfig> emitterConfig = pluginsConfig.getEmitterConfig(emitter.getPluginId()); + LOG.warn("EMITTER PLUGIN ID: " + emitter.getPluginId() + " : " + emitter.getClass()); + Optional<PluginConfig> emitterConfig = pluginsConfig.getEmitterConfig(emitter.getPluginId()); if (emitterConfig.isPresent()) { emitter.configure(emitterConfig.get()); if (emitter instanceof Initializable) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java index 6d9f03b7d..c3430ee98 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java @@ -19,24 +19,33 @@ package org.apache.tika.pipes.core.emitter; import java.io.IOException; import java.util.List; +import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.emitter.AbstractEmitter; +import org.apache.tika.pipes.api.emitter.EmitData; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.plugins.PluginConfig; -public class EmptyEmitter implements Emitter { +public class EmptyEmitter extends AbstractEmitter { - @Override - public String getName() { - return "empty"; + public EmptyEmitter() throws IOException { } @Override - public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) - throws IOException, TikaEmitterException { + public void configure(PluginConfig pluginConfig) throws TikaConfigException, IOException { + //no-op + } + @Override + public String getPluginId() { + return ""; } @Override - public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { + public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) + throws IOException, TikaEmitterException { } + } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java index ac9413583..d00d97662 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java @@ -23,6 +23,8 @@ import java.io.InputStream; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; +import org.apache.tika.pipes.api.emitter.Emitter; +import org.apache.tika.pipes.api.emitter.StreamEmitter; import org.apache.tika.pipes.core.FetchEmitTuple; import org.apache.tika.pipes.core.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.EmitterManager; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java index 09467b946..a6c576ea6 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java @@ -24,12 +24,12 @@ import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.fetcher.Fetcher; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; +import org.apache.tika.plugins.PluginConfig; public class EmptyFetcher implements Fetcher { @Override - public void configure(FetcherConfig fetcherConfig) throws TikaConfigException { + public void configure(PluginConfig pluginConfig) throws TikaConfigException { //no-op } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java index 2f9d4e5c1..dc38202a4 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java @@ -36,8 +36,8 @@ import org.apache.tika.config.InitializableProblemHandler; import org.apache.tika.exception.TikaConfigException; import org.apache.tika.exception.TikaException; import org.apache.tika.pipes.api.fetcher.Fetcher; -import org.apache.tika.pipes.api.fetcher.FetcherConfig; import org.apache.tika.pipes.core.PipesPluginsConfig; +import org.apache.tika.plugins.PluginConfig; /** * Utility class to hold multiple fetchers. @@ -66,7 +66,7 @@ public class FetcherManager { pluginManager.startPlugins(); Map<String, Fetcher> fetcherMap = new HashMap<>(); for (Fetcher fetcher : pluginManager.getExtensions(Fetcher.class)) { - Optional<FetcherConfig> fetcherConfig = pluginsConfig.getFetcherConfig(fetcher.getPluginId()); + Optional<PluginConfig> fetcherConfig = pluginsConfig.getFetcherConfig(fetcher.getPluginId()); if (fetcherConfig.isPresent()) { fetcher.configure(fetcherConfig.get()); if (fetcher instanceof Initializable) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java index ef4473112..5bde0a20f 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java @@ -65,7 +65,7 @@ public abstract class PipesIterator extends ConfigBase private ArrayBlockingQueue<FetchEmitTuple> queue = null; private int queueSize = DEFAULT_QUEUE_SIZE; private String fetcherPluginId; - private String emitterName; + private String emitterPluginId; private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT; private BasicContentHandlerFactory.HANDLER_TYPE handlerType = @@ -89,22 +89,22 @@ public abstract class PipesIterator extends ConfigBase } } - public String getFetcherName() { + public String getFetcherPluginId() { return fetcherPluginId; } @Field - public void setFetcherName(String fetcherPluginId) { + public void setFetcherPluginId(String fetcherPluginId) { this.fetcherPluginId = fetcherPluginId; } - public String getEmitterName() { - return emitterName; + public String getEmitterPluginId() { + return emitterPluginId; } @Field - public void setEmitterName(String emitterName) { - this.emitterName = emitterName; + public void setEmitterPluginId(String emitterPluginId) { + this.emitterPluginId = emitterPluginId; } @Field diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java index cf86bf80c..11b6df585 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java @@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.FetchEmitTuple; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.serialization.MetadataSerializer; import org.apache.tika.serialization.ParseContextSerializer; @@ -40,7 +40,7 @@ public class JsonEmitData { OBJECT_MAPPER.registerModule(module); } - public static void toJson(EmitData emitDataTuple, Writer writer) throws IOException { + public static void toJson(EmitDataImpl emitDataTuple, Writer writer) throws IOException { OBJECT_MAPPER.writeValue(writer, emitDataTuple); } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java deleted file mode 100644 index e126ef66e..000000000 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.pipes.emitter.fs; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.util.List; - -import org.apache.tika.config.Field; -import org.apache.tika.metadata.Metadata; -import org.apache.tika.metadata.TikaCoreProperties; -import org.apache.tika.parser.ParseContext; -import org.apache.tika.pipes.core.emitter.TikaEmitterException; -import org.apache.tika.serialization.JsonMetadataList; - -/** - * Emitter to write to a file system. - * <p> - * This calculates the path to write to based on the {@link #basePath} - * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value. - * - * <pre class="prettyprint"> - * <properties> - * <emitters> - * <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter> - * <params> - * <!-- required --> - * <param name="name" type="string">fs</param> - * <!-- required --> - * <param name="basePath" type="string">/path/to/output</param> - * <!-- optional; default is 'json' --> - * <param name="fileExtension" type="string">json</param> - * <!-- optional; if the file already exists, - * options ('skip', 'replace', 'exception') - * default is 'exception' --> - * <param name="onExists" type="string">skip</param> - * <!-- optional; whether or not to pretty print the output - * default is false --> - * <param name="prettyPrint" type="boolean">true</param> - * </params> - * </emitter> - * </emitters> - * </properties></pre> - */ -public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter { - - private Path basePath = null; - private String fileExtension = "json"; - private ON_EXISTS onExists = ON_EXISTS.EXCEPTION; - - private boolean prettyPrint = false; - - @Override - public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException { - Path output; - if (metadataList == null || metadataList.isEmpty()) { - throw new TikaEmitterException("metadata list must not be null or of size 0"); - } - - if (fileExtension != null && ! fileExtension.isEmpty()) { - emitKey += "." + fileExtension; - } - if (basePath != null) { - output = basePath.resolve(emitKey); - if (!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize())) { - throw new TikaEmitterException("path traversal?! " + output.toAbsolutePath()); - } - } else { - output = Paths.get(emitKey); - } - - if (output.getParent() != null && !Files.isDirectory(output.getParent())) { - Files.createDirectories(output.getParent()); - } - try (Writer writer = Files.newBufferedWriter(output, StandardCharsets.UTF_8)) { - JsonMetadataList.toJson(metadataList, writer, prettyPrint); - } - } - - @Field - public void setBasePath(String basePath) { - this.basePath = Paths.get(basePath); - } - - /** - * If you want to customize the output file's file extension. - * Do not include the "." - * - * @param fileExtension - */ - @Field - public void setFileExtension(String fileExtension) { - this.fileExtension = fileExtension; - } - - /** - * What to do if the target file already exists. NOTE: if more than one - * thread is trying write to the same file and {@link ON_EXISTS#REPLACE} is chosen, - * you still might get a {@link FileAlreadyExistsException}. - * - * @param onExists - */ - @Field - public void setOnExists(String onExists) { - switch (onExists) { - case "skip": - this.onExists = ON_EXISTS.SKIP; - break; - case "replace": - this.onExists = ON_EXISTS.REPLACE; - break; - case "exception": - this.onExists = ON_EXISTS.EXCEPTION; - break; - default: - throw new IllegalArgumentException("Don't understand '" + onExists + "'; must be one of: 'skip', 'replace', 'exception'"); - } - } - - @Field - public void setPrettyPrint(boolean prettyPrint) { - this.prettyPrint = prettyPrint; - } - - @Override - public void emit(String path, InputStream inputStream, Metadata userMetadata, ParseContext parseContext) throws IOException, TikaEmitterException { - Path target = basePath.resolve(path); - - if (!Files.isDirectory(target.getParent())) { - Files.createDirectories(target.getParent()); - } - if (onExists == ON_EXISTS.REPLACE) { - Files.copy(inputStream, target, StandardCopyOption.REPLACE_EXISTING); - } else if (onExists == ON_EXISTS.EXCEPTION) { - Files.copy(inputStream, target); - } else if (onExists == ON_EXISTS.SKIP) { - if (!Files.isRegularFile(target)) { - try { - Files.copy(inputStream, target); - } catch (FileAlreadyExistsException e) { - //swallow - } - } - } - } - - enum ON_EXISTS { - SKIP, EXCEPTION, REPLACE - } -} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java index 19199e1b0..3d97845fd 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java @@ -67,8 +67,8 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl String line = reader.readLine(); while (line != null) { if (! line.startsWith("#") && !StringUtils.isBlank(line)) { - FetchKey fetchKey = new FetchKey(getFetcherName(), line); - EmitKey emitKey = new EmitKey(getEmitterName(), line); + FetchKey fetchKey = new FetchKey(getFetcherPluginId(), line); + EmitKey emitKey = new EmitKey(getEmitterPluginId(), line); ParseContext parseContext = new ParseContext(); parseContext.set(HandlerConfig.class, getHandlerConfig()); tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey, @@ -95,8 +95,8 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl throws TikaConfigException { //these should all be fatal TikaConfig.mustNotBeEmpty("fileList", fileList); - TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName()); - TikaConfig.mustNotBeEmpty("emitterName", getFetcherName()); + TikaConfig.mustNotBeEmpty("fetcherPluginId", getFetcherPluginId()); + TikaConfig.mustNotBeEmpty("emitterPluginId", getEmitterPluginId()); fileListPath = Paths.get(fileList); if (!Files.isRegularFile(fileListPath)) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java index a8a498808..69a75f97a 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java @@ -79,7 +79,7 @@ public class FileSystemPipesIterator extends PipesIterator } try { - Files.walkFileTree(basePath, new FSFileVisitor(getFetcherName(), getEmitterName())); + Files.walkFileTree(basePath, new FSFileVisitor(getFetcherPluginId(), getEmitterPluginId())); } catch (IOException e) { Throwable cause = e.getCause(); if (cause != null && cause instanceof TimeoutException) { @@ -95,8 +95,8 @@ public class FileSystemPipesIterator extends PipesIterator throws TikaConfigException { //these should all be fatal TikaConfig.mustNotBeEmpty("basePath", basePath); - TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName()); - TikaConfig.mustNotBeEmpty("emitterName", getFetcherName()); + TikaConfig.mustNotBeEmpty("fetcherPluginId", getFetcherPluginId()); + TikaConfig.mustNotBeEmpty("emitterPluginId", getEmitterPluginId()); } @Override diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java index f241b79d5..06e745503 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import org.apache.tika.config.AbstractTikaConfigTest; import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.pipesiterator.PipesIterator; @@ -63,7 +64,7 @@ public class TikaPipesConfigTest extends AbstractTikaConfigTest { FetcherManager fetcherManager = FetcherManager.load( getConfigFilePath("fetchers-nobasepath-config.xml")); - }*/ + } @Test public void testEmitters() throws Exception { @@ -81,12 +82,13 @@ public class TikaPipesConfigTest extends AbstractTikaConfigTest { EmitterManager.load(getConfigFilePath("emitters-duplicate-config.xml")); }); } + */ @Test public void testPipesIterator() throws Exception { PipesIterator it = PipesIterator.build(getConfigFilePath("pipes-iterator-config.xml")); - assertEquals("fs1", it.getFetcherName()); + assertEquals("fsf1", it.getFetcherPluginId()); } @Test @@ -94,7 +96,7 @@ public class TikaPipesConfigTest extends AbstractTikaConfigTest { assertThrows(TikaConfigException.class, () -> { PipesIterator it = PipesIterator.build(getConfigFilePath("pipes-iterator-multiple-config.xml")); - assertEquals("fs1", it.getFetcherName()); + assertEquals("fsf1", it.getFetcherPluginId()); }); } } diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java deleted file mode 100644 index 6d32ea2c4..000000000 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tika.pipes.core.emitter; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.tika.config.Field; -import org.apache.tika.config.Initializable; -import org.apache.tika.config.InitializableProblemHandler; -import org.apache.tika.config.Param; -import org.apache.tika.exception.TikaConfigException; -import org.apache.tika.metadata.Metadata; -import org.apache.tika.parser.ParseContext; - -public class MockEmitter extends AbstractEmitter implements Initializable { - - @Field - private boolean throwOnCheck = false; - - @Override - public void initialize(Map<String, Param> params) throws TikaConfigException { - - } - - public void setThrowOnCheck(boolean throwOnCheck) { - this.throwOnCheck = throwOnCheck; - } - - @Override - public void checkInitialization(InitializableProblemHandler problemHandler) - throws TikaConfigException { - - if (throwOnCheck) { - throw new TikaConfigException("throw on check"); - } - - } - - @Override - public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) - throws IOException, TikaEmitterException { - - } -} diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java index 42606b0d7..2cf38ef51 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java @@ -36,8 +36,8 @@ public class FileListPipesIteratorTest { public void testBasic() throws Exception { Path p = Paths.get(this.getClass().getResource("/test-documents/file-list.txt").toURI()); FileListPipesIterator it = new FileListPipesIterator(); - it.setFetcherName("f"); - it.setEmitterName("e"); + it.setFetcherPluginId("f"); + it.setEmitterPluginId("e"); it.setFileList(p.toAbsolutePath().toString()); it.setHasHeader(false); it.checkInitialization(InitializableProblemHandler.DEFAULT); @@ -59,8 +59,8 @@ public class FileListPipesIteratorTest { public void testHasHeader() throws Exception { Path p = Paths.get(this.getClass().getResource("/test-documents/file-list.txt").toURI()); FileListPipesIterator it = new FileListPipesIterator(); - it.setFetcherName("f"); - it.setEmitterName("e"); + it.setFetcherPluginId("f"); + it.setEmitterPluginId("e"); it.setFileList(p.toAbsolutePath().toString()); it.setHasHeader(true); it.checkInitialization(InitializableProblemHandler.DEFAULT); diff --git a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java index cfcb12318..9178aeb2b 100644 --- a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java +++ b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java @@ -61,9 +61,9 @@ public class FileSystemPipesIteratorTest { truthSet.add(fetchString); } - String fetcherName = "fs"; + String fetcherName = "file-system-fetcher"; PipesIterator it = new FileSystemPipesIterator(root); - it.setFetcherName(fetcherName); + it.setFetcherPluginId(fetcherName); it.setQueueSize(2); Set<String> iteratorSet = new HashSet<>(); diff --git a/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json b/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json index e153df966..b0e9e0ff0 100644 --- a/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json +++ b/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json @@ -1,5 +1,5 @@ { - "pipesPluginsConfig" : { + "plugins" : { "fetchers": { "file-system-fetcher": { "basePath": "{BASE_PATH}", diff --git a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml index 50d006142..31f7f0a1a 100644 --- a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml +++ b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml @@ -18,7 +18,8 @@ <properties> <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator"> <params> - <fetcherPluginId>fs1</fetcherPluginId> + <fetcherPluginId>fsf1</fetcherPluginId> + <emitterPluginId>fse1</emitterPluginId> <basePath>/my/base/path1</basePath> </params> </pipesIterator> diff --git a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml index e4e127d30..f7bed7010 100644 --- a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml +++ b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml @@ -18,13 +18,15 @@ <properties> <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator"> <params> - <fetcherPluginId>fs1</fetcherPluginId> + <fetcherPluginId>fsf1</fetcherPluginId> + <emitterPluginId>fse1</emitterPluginId> <basePath>/my/base/path1</basePath> </params> </pipesIterator> <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator"> <params> - <fetcherPluginId>fs2</fetcherPluginId> + <fetcherPluginId>fsf2</fetcherPluginId> + <emitterPluginId>fse2</emitterPluginId> <basePath>/my/base/path2</basePath> </params> </pipesIterator> diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java index ed4ad524f..d059128af 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java @@ -88,7 +88,7 @@ public class AZBlobPipesIterator extends PipesIterator implements Initializable @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); long start = System.currentTimeMillis(); int count = 0; HandlerConfig handlerConfig = getHandlerConfig(); diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java index 44663a22e..cdc600b2b 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java @@ -114,7 +114,7 @@ public class CSVPipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); try (Reader reader = Files.newBufferedReader(csvPath, charset)) { Iterable<CSVRecord> records = CSVFormat.EXCEL.parse(reader); List<String> headers = new ArrayList<>(); diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java index dccc1f70c..e04dfa866 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java @@ -44,7 +44,7 @@ public class TestCSVPipesIterator { Path p = get("test-simple.csv"); CSVPipesIterator it = new CSVPipesIterator(); it.setFetcherName("fsf"); - it.setEmitterName("fse"); + it.setEmitterPluginId("fse"); it.setCsvPath(p); it.setFetchKeyColumn("fetchKey"); int numConsumers = 2; diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java index 857e2ac0a..93129f375 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java @@ -93,7 +93,7 @@ public class GCSPipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); long start = System.currentTimeMillis(); int count = 0; HandlerConfig handlerConfig = getHandlerConfig(); diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java index 39fc3fe45..c1cf0c998 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java @@ -141,7 +141,7 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); FetchEmitKeyIndices fetchEmitKeyIndices = null; List<String> headers = new ArrayList<>(); int rowCount = 0; @@ -343,11 +343,11 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable { throw new TikaConfigException("If you specify a 'fetchKeyColumn', you must specify a 'fetcherPluginId'"); } - if (StringUtils.isBlank(getEmitterName()) && !StringUtils.isBlank(emitKeyColumn)) { + if (StringUtils.isBlank(getEmitterPluginId()) && !StringUtils.isBlank(emitKeyColumn)) { throw new TikaConfigException("If you specify an 'emitKeyColumn', you must specify an 'emitterName'"); } - if (StringUtils.isBlank(getEmitterName()) && StringUtils.isBlank(getFetcherName())) { + if (StringUtils.isBlank(getEmitterPluginId()) && StringUtils.isBlank(getFetcherName())) { LOGGER.warn("no fetcher or emitter specified?!"); } diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java index 26474284a..1d95d3016 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java @@ -149,7 +149,7 @@ public class KafkaPipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); long start = System.currentTimeMillis(); int count = 0; HandlerConfig handlerConfig = getHandlerConfig(); diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java index a35a559fe..f399925c2 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java @@ -197,7 +197,7 @@ public class S3PipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); long start = System.currentTimeMillis(); int count = 0; HandlerConfig handlerConfig = getHandlerConfig(); diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java index a133e607d..956fa6604 100644 --- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java +++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java @@ -175,7 +175,7 @@ public class SolrPipesIterator extends PipesIterator implements Initializable { @Override protected void enqueue() throws InterruptedException, IOException, TimeoutException { String fetcherPluginId = getFetcherName(); - String emitterName = getEmitterName(); + String emitterName = getEmitterPluginId(); try (SolrClient solrClient = createSolrClient()) { int fileCount = 0; diff --git a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java new file mode 100644 index 000000000..00fd6f151 --- /dev/null +++ b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java @@ -0,0 +1,34 @@ +package org.apache.tika.serialization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import org.apache.tika.plugins.PluginConfig; +import org.apache.tika.plugins.PluginConfigs; + +public class PluginConfigLoader { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + static { + SimpleModule module = new SimpleModule(); + module.addSerializer(PluginConfig.class, new PluginsConfigSerializer()); + OBJECT_MAPPER.registerModule(module); + OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + } + + public static PluginConfigs load(InputStream is) throws IOException { + try (Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8)) { + return OBJECT_MAPPER.readValue(reader, PluginConfigs.class); + } + } + +} diff --git a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java new file mode 100644 index 000000000..e773c01c9 --- /dev/null +++ b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java @@ -0,0 +1,27 @@ +package org.apache.tika.serialization; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; + +import org.apache.tika.plugins.PluginConfig; + +public class PluginsConfigDeserializer extends JsonDeserializer<PluginConfig> { + + @Override + public PluginConfig deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JacksonException { + JsonNode node = jsonParser.getCodec().readTree(jsonParser); + + String pluginId = node.get("pluginId").asText(); + + JsonNode jsonConfigNode = node.get("jsonConfig"); + + String jsonConfigRaw = jsonConfigNode.toString(); + + return new PluginConfig(pluginId, jsonConfigRaw); + } +} diff --git a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java new file mode 100644 index 000000000..feadf4ead --- /dev/null +++ b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java @@ -0,0 +1,21 @@ +package org.apache.tika.serialization; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import org.apache.tika.plugins.PluginConfig; + +public class PluginsConfigSerializer extends JsonSerializer<PluginConfig> { + + @Override + public void serialize(PluginConfig pluginsConfig, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("pluginId", pluginsConfig.pluginId()); + jsonGenerator.writeFieldName("jsonConfig"); + jsonGenerator.writeRawValue(pluginsConfig.jsonConfig()); + jsonGenerator.writeEndObject(); + } +} diff --git a/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java b/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java new file mode 100644 index 000000000..56808006a --- /dev/null +++ b/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java @@ -0,0 +1,69 @@ +package org.apache.tika.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.Test; + +import org.apache.tika.plugins.PluginConfig; +import org.apache.tika.plugins.PluginConfigs; + +public class PluginsConfigTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + static { + SimpleModule module = new SimpleModule(); + module.addDeserializer(PluginConfig.class, new PluginsConfigDeserializer()); + module.addSerializer(PluginConfig.class, new PluginsConfigSerializer()); + OBJECT_MAPPER.registerModule(module); + OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + } + + @Test + public void testBasic() throws Exception { + + PluginConfig p1 = new PluginConfig("pluginId", + """ + {"basePath":"/my/docs","includeSystemInfo":true} + """); + String json = OBJECT_MAPPER.writeValueAsString(p1); + + PluginConfig deserialized = OBJECT_MAPPER.readValue(json, PluginConfig.class); + assertEquals(p1.pluginId(), deserialized.pluginId()); + assertEquals(flatten(p1.jsonConfig()), flatten(deserialized.jsonConfig())); + } + + @Test + public void testMap() throws Exception { + PluginConfig p1 = new PluginConfig("pluginId1", + """ + {"basePath":"/my/docs1","includeSystemInfo":true} + """); + PluginConfig p2 = new PluginConfig("pluginId2", + """ + {"basePath":"/my/docs2","includeSystemInfo":false} + """); + Map<String, PluginConfig> map = new HashMap<>(); + map.put(p1.pluginId(), p1); + map.put(p2.pluginId(), p2); + PluginConfigs pluginConfigManager = new PluginConfigs(map); + + String json = OBJECT_MAPPER.writeValueAsString(pluginConfigManager); + + PluginConfigs deserialized = OBJECT_MAPPER.readValue(json, PluginConfigs.class); + assertEquals(pluginConfigManager.get(p1.pluginId()).get().pluginId(), deserialized.get(p1.pluginId()).get().pluginId()); + assertEquals(flatten(pluginConfigManager.get(p1.pluginId()).get().jsonConfig()), + flatten(deserialized.get(p1.pluginId()).get().jsonConfig())); + } + + private static String flatten(String s) { + return s.replaceAll("[\r\n]", ""); + } +} diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java index efc5120fc..1ce5ffe05 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java @@ -46,7 +46,7 @@ import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.core.FetchEmitTuple; import org.apache.tika.pipes.core.async.AsyncProcessor; import org.apache.tika.pipes.core.async.OfferLargerThanQueueSize; -import org.apache.tika.pipes.core.emitter.EmitData; +import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig; import org.apache.tika.pipes.core.fetcher.FetchKey; @@ -73,7 +73,7 @@ public class AsyncResource { return queue; } - public ArrayBlockingQueue<EmitData> getEmitDataQueue(int size) { + public ArrayBlockingQueue<EmitDataImpl> getEmitDataQueue(int size) { return new ArrayBlockingQueue<>(size); }
