Repository: beam Updated Branches: refs/heads/master b700bc47f -> d60b29ff7
[BEAM-2328] Add TikaIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2265b6ce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2265b6ce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2265b6ce Branch: refs/heads/master Commit: 2265b6ce6569ee9b63a697a052ecc72b1d2f2cdb Parents: b700bc4 Author: Sergey Beryozkin <[email protected]> Authored: Thu May 25 16:47:59 2017 +0100 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon Sep 11 08:01:29 2017 +0200 ---------------------------------------------------------------------- sdks/java/io/pom.xml | 1 + sdks/java/io/tika/pom.xml | 118 +++++ .../org/apache/beam/sdk/io/tika/TikaIO.java | 307 ++++++++++++ .../apache/beam/sdk/io/tika/TikaOptions.java | 78 ++++ .../org/apache/beam/sdk/io/tika/TikaSource.java | 466 +++++++++++++++++++ .../apache/beam/sdk/io/tika/package-info.java | 22 + .../org/apache/beam/sdk/io/tika/TikaIOTest.java | 261 +++++++++++ .../apache/beam/sdk/io/tika/TikaReaderTest.java | 82 ++++ .../apache/beam/sdk/io/tika/TikaSourceTest.java | 73 +++ .../src/test/resources/apache-beam-tika-pdf.zip | Bin 0 -> 11685 bytes .../src/test/resources/apache-beam-tika.pdf | Bin 0 -> 12392 bytes .../src/test/resources/apache-beam-tika1.odt | Bin 0 -> 12540 bytes .../src/test/resources/apache-beam-tika2.odt | Bin 0 -> 11412 bytes .../java/io/tika/src/test/resources/damaged.pdf | 2 + 14 files changed, 1410 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index c291e5d..c1bb2f2 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -57,6 +57,7 @@ <module>mongodb</module> <module>mqtt</module> <module>solr</module> + <module>tika</module> <module>xml</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/pom.xml b/sdks/java/io/tika/pom.xml new file mode 100644 index 0000000..c653d1e --- /dev/null +++ b/sdks/java/io/tika/pom.xml @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-tika</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Tika</name> + <description>Tika Input to parse files.</description> + + + <properties> + <tika.version>1.16</tika.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-core</artifactId> + <version>${tika.version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-parsers</artifactId> + <version>${tika.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java new file mode 100644 index 0000000..5d6eea7 --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.tika.metadata.Metadata; + + +/** + * {@link PTransform} for parsing arbitrary files using Apache Tika. + * Files in many well known text, binary or scientific formats can be processed. + * + * <p>To read a {@link PCollection} from one or more files + * use {@link TikaIO.Read#from(String)} + * to specify the path of the file(s) to be read. + * + * <p>{@link TikaIO.Read} returns a bounded {@link PCollection} of {@link String Strings}, + * each corresponding to a sequence of characters reported by Apache Tika SAX Parser. + * + * <p>Example: + * + * <pre>{@code + * Pipeline p = ...; + * + * // A simple Read of a local PDF file (only runs locally): + * PCollection<String> content = p.apply(TikaInput.from("/local/path/to/file.pdf")); + * }</pre> + */ +public class TikaIO { + + /** + * A {@link PTransform} that parses one or more files and returns a bounded {@link PCollection} + * containing one element for each sequence of characters reported by Apache Tika SAX Parser. + */ + public static Read read() { + return new AutoValue_TikaIO_Read.Builder() + .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) + .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) + .build(); + } + + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<String>> { + private static final long serialVersionUID = 2198301984784351829L; + public static final long DEFAULT_QUEUE_POLL_TIME = 50L; + public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L; + + @Nullable abstract ValueProvider<String> getFilepattern(); + @Nullable abstract ValueProvider<String> getTikaConfigPath(); + @Nullable abstract Metadata getInputMetadata(); + @Nullable abstract Boolean getReadOutputMetadata(); + @Nullable abstract Long getQueuePollTime(); + @Nullable abstract Long getQueueMaxPollTime(); + @Nullable abstract Integer getMinimumTextLength(); + @Nullable abstract Boolean getParseSynchronously(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilepattern(ValueProvider<String> filepattern); + abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath); + abstract Builder setInputMetadata(Metadata metadata); + abstract Builder setReadOutputMetadata(Boolean value); + abstract Builder setQueuePollTime(Long value); + abstract Builder setQueueMaxPollTime(Long value); + abstract Builder setMinimumTextLength(Integer value); + abstract Builder setParseSynchronously(Boolean value); + + abstract Read build(); + } + + /** + * A {@link PTransform} that parses one or more files with the given filename + * or filename pattern and returns a bounded {@link PCollection} containing + * one element for each sequence of characters reported by Apache Tika SAX Parser. + * + * <p>Filepattern can be a local path (if running locally), or a Google Cloud Storage + * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"} + * (if running locally or using remote execution service). + * + * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java + * Filesystem glob patterns</a> ("*", "?", "[..]") are supported. + */ + public Read from(String filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); + return from(StaticValueProvider.of(filepattern)); + } + + /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ + public Read from(ValueProvider<String> filepattern) { + checkNotNull(filepattern, "Filepattern cannot be empty."); + return toBuilder() + .setFilepattern(filepattern) + .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) + .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) + .build(); + } + + /** + * Returns a new transform which will use the custom TikaConfig. + */ + public Read withTikaConfigPath(String tikaConfigPath) { + checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); + return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath)); + } + + /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */ + public Read withTikaConfigPath(ValueProvider<String> tikaConfigPath) { + checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); + return toBuilder() + .setTikaConfigPath(tikaConfigPath) + .build(); + } + + /** + * Returns a new transform which will use the provided content type hint + * to make the file parser detection more efficient. + */ + public Read withContentTypeHint(String contentType) { + checkNotNull(contentType, "ContentType cannot be empty."); + Metadata metadata = new Metadata(); + metadata.add(Metadata.CONTENT_TYPE, contentType); + return withInputMetadata(metadata); + } + + /** + * Returns a new transform which will use the provided input metadata + * for parsing the files. + */ + public Read withInputMetadata(Metadata metadata) { + Metadata inputMetadata = this.getInputMetadata(); + if (inputMetadata != null) { + for (String name : metadata.names()) { + inputMetadata.set(name, metadata.get(name)); + } + } else { + inputMetadata = metadata; + } + return toBuilder().setInputMetadata(inputMetadata).build(); + } + + /** + * Returns a new transform which will report the metadata. + */ + public Read withReadOutputMetadata(Boolean value) { + return toBuilder().setReadOutputMetadata(value).build(); + } + + /** + * Returns a new transform which will use the specified queue poll time. + */ + public Read withQueuePollTime(Long value) { + return toBuilder().setQueuePollTime(value).build(); + } + + /** + * Returns a new transform which will use the specified queue max poll time. + */ + public Read withQueueMaxPollTime(Long value) { + return toBuilder().setQueueMaxPollTime(value).build(); + } + + /** + * Returns a new transform which will operate on the text blocks with the + * given minimum text length. + */ + public Read withMinimumTextlength(Integer value) { + return toBuilder().setMinimumTextLength(value).build(); + } + + /** + * Returns a new transform which will use the synchronous reader. + */ + public Read withParseSynchronously(Boolean value) { + return toBuilder().setParseSynchronously(value).build(); + } + + /** + * Path to Tika configuration resource. + */ + public Read withOptions(TikaOptions options) { + checkNotNull(options, "TikaOptions cannot be empty."); + Builder builder = toBuilder(); + builder.setFilepattern(StaticValueProvider.of(options.getInput())) + .setQueuePollTime(options.getQueuePollTime()) + .setQueueMaxPollTime(options.getQueueMaxPollTime()) + .setMinimumTextLength(options.getMinimumTextLength()) + .setParseSynchronously(options.getParseSynchronously()); + if (options.getContentTypeHint() != null) { + Metadata metadata = this.getInputMetadata(); + if (metadata == null) { + metadata = new Metadata(); + } + metadata.add(Metadata.CONTENT_TYPE, options.getContentTypeHint()); + builder.setInputMetadata(metadata); + } + if (options.getTikaConfigPath() != null) { + builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath())); + } + if (Boolean.TRUE.equals(options.getReadOutputMetadata())) { + builder.setReadOutputMetadata(options.getReadOutputMetadata()); + } + return builder.build(); + } + + @Override + public PCollection<String> expand(PBegin input) { + checkNotNull(this.getFilepattern(), "Filepattern cannot be empty."); + final Bounded<String> read = org.apache.beam.sdk.io.Read.from(new TikaSource(this)); + PCollection<String> pcol = input.getPipeline().apply(read); + pcol.setCoder(getDefaultOutputCoder()); + return pcol; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + String filepatternDisplay = getFilepattern().isAccessible() + ? getFilepattern().get() : getFilepattern().toString(); + builder + .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) + .withLabel("File Pattern")); + if (getTikaConfigPath() != null) { + String tikaConfigPathDisplay = getTikaConfigPath().isAccessible() + ? getTikaConfigPath().get() : getTikaConfigPath().toString(); + builder.add(DisplayData.item("tikaConfigPath", tikaConfigPathDisplay) + .withLabel("TikaConfig Path")); + } + Metadata metadata = getInputMetadata(); + if (metadata != null) { + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (String name : metadata.names()) { + if (sb.length() > 1) { + sb.append(','); + } + sb.append(name).append('=').append(metadata.get(name)); + } + sb.append(']'); + builder + .add(DisplayData.item("inputMetadata", sb.toString()) + .withLabel("Input Metadata")); + } + if (Boolean.TRUE.equals(getParseSynchronously())) { + builder + .add(DisplayData.item("parseMode", "synchronous") + .withLabel("Parse Mode")); + } else { + builder + .add(DisplayData.item("parseMode", "asynchronous") + .withLabel("Parse Mode")); + builder + .add(DisplayData.item("queuePollTime", getQueuePollTime().toString()) + .withLabel("Queue Poll Time")) + .add(DisplayData.item("queueMaxPollTime", getQueueMaxPollTime().toString()) + .withLabel("Queue Max Poll Time")); + } + Integer minTextLen = getMinimumTextLength(); + if (minTextLen != null && minTextLen > 0) { + builder + .add(DisplayData.item("minTextLen", getMinimumTextLength().toString()) + .withLabel("Minimum Text Length")); + } + if (Boolean.TRUE.equals(getReadOutputMetadata())) { + builder + .add(DisplayData.item("readOutputMetadata", "true") + .withLabel("Read Output Metadata")); + } + } + + @Override + protected Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java new file mode 100644 index 0000000..fb97678 --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation; + +/** + * TikaInput Options to support the command-line applications. + */ +public interface TikaOptions extends PipelineOptions { + + @Description("Input path") + @Validation.Required + String getInput(); + void setInput(String value); + + @Description("Tika Config path") + String getTikaConfigPath(); + void setTikaConfigPath(String value); + + @Description("Tika Parser Content Type hint") + String getContentTypeHint(); + void setContentTypeHint(String value); + + @Description("Metadata report status") + @Default.Boolean(false) + Boolean getReadOutputMetadata(); + void setReadOutputMetadata(Boolean value); + + @Description("Optional use of the synchronous reader") + @Default.Boolean(false) + Boolean getParseSynchronously(); + void setParseSynchronously(Boolean value); + + @Description("Tika Parser queue poll time in milliseconds") + @Default.Long(TikaIO.Read.DEFAULT_QUEUE_POLL_TIME) + Long getQueuePollTime(); + void setQueuePollTime(Long value); + + @Description("Tika Parser queue maximum poll time in milliseconds") + @Default.Long(TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME) + Long getQueueMaxPollTime(); + void setQueueMaxPollTime(Long value); + + @Description("Minumin text fragment length for Tika Parser to report") + @Default.Integer(0) + Integer getMinimumTextLength(); + void setMinimumTextLength(Integer value); + + @Description("Pipeline name") + @Default.String("TikaRead") + String getPipelineName(); + void setPipelineName(String value); + + @Description("Output path") + @Default.String("/tmp/tika/out") + String getOutput(); + void setOutput(String value); + +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java new file mode 100644 index 0000000..7c8852b --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.DefaultHandler; + +/** + * Implementation detail of {@link TikaIO.Read}. + * + * <p>A {@link Source} which can represent the content of the files parsed by Apache Tika. + */ +class TikaSource extends BoundedSource<String> { + private static final long serialVersionUID = -509574062910491122L; + private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class); + + @Nullable + private MatchResult.Metadata singleFileMetadata; + private final Mode mode; + private final TikaIO.Read spec; + + /** + * Source mode. + */ + public enum Mode { + FILEPATTERN, SINGLE_FILE + } + + TikaSource(TikaIO.Read spec) { + this.mode = Mode.FILEPATTERN; + this.spec = spec; + } + + TikaSource(Metadata fileMetadata, TikaIO.Read spec) { + mode = Mode.SINGLE_FILE; + this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata"); + this.spec = spec; + } + + @Override + public BoundedReader<String> createReader(PipelineOptions options) throws IOException { + this.validate(); + checkState(spec.getFilepattern().isAccessible(), + "Cannot create a Tika reader without access to the file" + + " or pattern specification: {}.", spec.getFilepattern()); + if (spec.getTikaConfigPath() != null) { + checkState(spec.getTikaConfigPath().isAccessible(), + "Cannot create a Tika reader without access to its configuration", + spec.getTikaConfigPath()); + } + + String fileOrPattern = spec.getFilepattern().get(); + if (mode == Mode.FILEPATTERN) { + List<Metadata> fileMetadata = expandFilePattern(fileOrPattern); + List<TikaReader> fileReaders = new ArrayList<>(); + for (Metadata metadata : fileMetadata) { + fileReaders.add(new TikaReader(this, metadata.resourceId().toString())); + } + if (fileReaders.size() == 1) { + return fileReaders.get(0); + } + return new FilePatternTikaReader(this, fileReaders); + } else { + return new TikaReader(this, singleFileMetadata.resourceId().toString()); + } + + } + + @Override + public List<? extends TikaSource> split(long desiredBundleSizeBytes, PipelineOptions options) + throws Exception { + if (mode == Mode.SINGLE_FILE) { + return ImmutableList.of(this); + } else { + List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get()); + + List<TikaSource> splitResults = new LinkedList<>(); + for (Metadata metadata : fileMetadata) { + splitResults.add(new TikaSource(metadata, spec)); + } + return splitResults; + } + } + + public TikaIO.Read getTikaInputRead() { + return spec; + } + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + @Override + public void validate() { + switch (mode) { + case FILEPATTERN: + checkArgument(this.singleFileMetadata == null, + "Unexpected initialized singleFileMetadata value"); + break; + case SINGLE_FILE: + checkNotNull(this.singleFileMetadata, + "Unexpected null singleFileMetadata value"); + break; + default: + throw new IllegalStateException("Unknown mode: " + mode); + } + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + long totalSize = 0; + List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get()); + for (Metadata metadata : fileMetadata) { + totalSize += metadata.sizeBytes(); + } + return totalSize; + } + + Mode getMode() { + return this.mode; + } + + Metadata getSingleFileMetadata() { + return this.singleFileMetadata; + } + + private static List<Metadata> expandFilePattern(String fileOrPattern) throws IOException { + MatchResult matches = Iterables.getOnlyElement( + FileSystems.match(Collections.singletonList(fileOrPattern))); + LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPattern); + List<Metadata> metadata = ImmutableList.copyOf(matches.metadata()); + checkArgument(!metadata.isEmpty(), + "Unable to find any files matching %s", fileOrPattern); + + return metadata; + } + + /** + * FilePatternTikaReader. + * TODO: This is mostly a copy of FileBasedSource internal file-pattern reader + * so that code would need to be generalized as part of the future contribution + */ + static class FilePatternTikaReader extends BoundedReader<String> { + private final TikaSource source; + final ListIterator<TikaReader> fileReadersIterator; + TikaReader currentReader = null; + + public FilePatternTikaReader(TikaSource source, List<TikaReader> fileReaders) { + this.source = source; + this.fileReadersIterator = fileReaders.listIterator(); + } + + @Override + public boolean start() throws IOException { + return startNextNonemptyReader(); + } + + @Override + public boolean advance() throws IOException { + checkState(currentReader != null, "Call start() before advance()"); + if (currentReader.advance()) { + return true; + } + return startNextNonemptyReader(); + } + + private boolean startNextNonemptyReader() throws IOException { + while (fileReadersIterator.hasNext()) { + currentReader = fileReadersIterator.next(); + if (currentReader.start()) { + return true; + } + currentReader.close(); + } + return false; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return currentReader.getCurrent(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentReader.getCurrentTimestamp(); + } + + @Override + public void close() throws IOException { + if (currentReader != null) { + currentReader.close(); + } + while (fileReadersIterator.hasNext()) { + fileReadersIterator.next().close(); + } + } + + @Override + public TikaSource getCurrentSource() { + return source; + } + } + + static class TikaReader extends BoundedReader<String> { + private ExecutorService execService; + private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl(); + private String current; + private TikaSource source; + private String filePath; + private TikaIO.Read spec; + private org.apache.tika.metadata.Metadata tikaMetadata; + private Iterator<String> metadataIterator; + + TikaReader(TikaSource source, String filePath) { + this.source = source; + this.filePath = filePath; + this.spec = source.getTikaInputRead(); + } + + @Override + public boolean start() throws IOException { + final InputStream is = TikaInputStream.get(Paths.get(filePath)); + TikaConfig tikaConfig = null; + if (spec.getTikaConfigPath() != null) { + try { + tikaConfig = new TikaConfig(spec.getTikaConfigPath().get()); + } catch (TikaException | SAXException e) { + throw new IOException(e); + } + } + final Parser parser = tikaConfig == null ? new AutoDetectParser() + : new AutoDetectParser(tikaConfig); + final ParseContext context = new ParseContext(); + context.set(Parser.class, parser); + tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata() + : new org.apache.tika.metadata.Metadata(); + + if (spec.getMinimumTextLength() != null) { + tikaHandler.setMinTextLength(spec.getMinimumTextLength()); + } + + if (!Boolean.TRUE.equals(spec.getParseSynchronously())) { + // Try to parse the file on the executor thread to make the best effort + // at letting the pipeline thread advancing over the file content + // without immediately parsing all of it + execService = Executors.newFixedThreadPool(1); + execService.submit(new Runnable() { + public void run() { + try { + parser.parse(is, tikaHandler, tikaMetadata, context); + is.close(); + } catch (Exception ex) { + tikaHandler.setParseException(ex); + } + } + }); + } else { + // Some parsers might not be able to report the content in chunks. + // It does not make sense to create extra threads in such cases + try { + parser.parse(is, tikaHandler, tikaMetadata, context); + } catch (Exception ex) { + throw new IOException(ex); + } finally { + is.close(); + } + } + return advanceToNext(); + } + + @Override + public boolean advance() throws IOException { + checkState(current != null, "Call start() before advance()"); + return advanceToNext(); + } + + protected boolean advanceToNext() throws IOException { + current = null; + // The content is reported first + if (metadataIterator == null) { + // Check if some content is already available + current = tikaHandler.getCurrent(); + + if (current == null && !Boolean.TRUE.equals(spec.getParseSynchronously())) { + long maxPollTime = 0; + long configuredMaxPollTime = spec.getQueueMaxPollTime() == null + ? TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME : spec.getQueueMaxPollTime(); + long configuredPollTime = spec.getQueuePollTime() == null + ? TikaIO.Read.DEFAULT_QUEUE_POLL_TIME : spec.getQueuePollTime(); + + // Poll the queue till the next piece of data is available + while (current == null && maxPollTime < configuredMaxPollTime) { + boolean docEnded = tikaHandler.waitForNext(configuredPollTime); + current = tikaHandler.getCurrent(); + // End of Document ? + if (docEnded) { + break; + } + maxPollTime += spec.getQueuePollTime(); + } + } + // No more content ? + if (current == null && Boolean.TRUE.equals(spec.getReadOutputMetadata())) { + // Time to report the metadata + metadataIterator = Arrays.asList(tikaMetadata.names()).iterator(); + } + } + + if (metadataIterator != null && metadataIterator.hasNext()) { + String key = metadataIterator.next(); + // The metadata name/value separator can be configured if needed + current = key + "=" + tikaMetadata.get(key); + } + return current != null; + } + + @Override + public String getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public void close() throws IOException { + if (execService != null) { + execService.shutdown(); + } + } + + ExecutorService getExecutorService() { + return execService; + } + + @Override + public BoundedSource<String> getCurrentSource() { + return source; + } + } + + /** + * Tika Parser Content Handler. + */ + static class ContentHandlerImpl extends DefaultHandler { + private Queue<String> queue = new ConcurrentLinkedQueue<>(); + private volatile boolean documentEnded; + private volatile Exception parseException; + private volatile String current; + private int minTextLength; + + @Override + public void characters(char ch[], int start, int length) throws SAXException { + String value = new String(ch, start, length).trim(); + if (!value.isEmpty()) { + if (minTextLength <= 0) { + queue.add(value); + } else { + current = current == null ? value : current + " " + value; + if (current.length() >= minTextLength) { + queue.add(current); + current = null; + } + } + } + } + + public void setParseException(Exception ex) { + this.parseException = ex; + } + + public synchronized boolean waitForNext(long pollTime) throws IOException { + if (!documentEnded) { + try { + wait(pollTime); + } catch (InterruptedException ex) { + // continue; + } + } + return documentEnded; + } + + @Override + public synchronized void endDocument() throws SAXException { + this.documentEnded = true; + notify(); + } + + public String getCurrent() throws IOException { + checkParseException(); + String value = queue.poll(); + if (value == null && documentEnded) { + return current; + } else { + return value; + } + } + public void checkParseException() throws IOException { + if (parseException != null) { + throw new IOException(parseException); + } + } + + public void setMinTextLength(int minTextLength) { + this.minTextLength = minTextLength; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java new file mode 100644 index 0000000..972d69f --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transform for reading and parsing files with Apache Tika. + */ +package org.apache.beam.sdk.io.tika; http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java new file mode 100644 index 0000000..368eff5 --- /dev/null +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.apache.tika.exception.TikaException; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests TikaInput. + */ +public class TikaIOTest { + private static final String[] PDF_FILE = new String[] { + "Combining", "can help to ingest", "Apache Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika" + }; + private static final String[] PDF_ZIP_FILE = new String[] { + "Combining", "can help to ingest", "Apache Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika", + "apache-beam-tika.pdf" + }; + private static final String[] ODT_FILE = new String[] { + "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika" + }; + private static final String[] ODT_FILE_WITH_METADATA = new String[] { + "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika", + "Author=BeamTikaUser" + }; + private static final String[] ODT_FILE_WITH_MIN_TEXT_LEN = new String[] { + "Combining Apache Beam", "and Apache Tika can help to ingest", "in most known formats.", + "the content from the files" + }; + private static final String[] ODT_FILES = new String[] { + "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika", + "Open Office", "Text", "PDF", "Excel", "Scientific", + "and other formats", "are supported." + }; + + @Rule + public TestPipeline p = TestPipeline.create(); + + @Test + public void testReadPdfFile() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); + + doTestReadFiles(resourcePath, PDF_FILE); + } + + @Test + public void testReadZipPdfFile() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); + + doTestReadFiles(resourcePath, PDF_ZIP_FILE); + } + + @Test + public void testReadOdtFile() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + + doTestReadFiles(resourcePath, ODT_FILE); + } + + @Test + public void testReadOdtFiles() throws IOException { + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + resourcePath = resourcePath.replace("apache-beam-tika1", "*"); + + doTestReadFiles(resourcePath, ODT_FILES); + } + + private void doTestReadFiles(String resourcePath, String[] expected) throws IOException { + PCollection<String> output = p.apply("ParseFiles", TikaIO.read().from(resourcePath)); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + @Test + public void testReadOdtFileWithMetadata() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + + PCollection<String> output = p.apply("ParseOdtFile", + TikaIO.read().from(resourcePath).withReadOutputMetadata(true)) + .apply(ParDo.of(new FilterMetadataFn())); + PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_METADATA); + p.run(); + } + + @Test + public void testReadOdtFileWithMinTextLength() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + + PCollection<String> output = p.apply("ParseOdtFile", + TikaIO.read().from(resourcePath).withMinimumTextlength(20)); + PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_MIN_TEXT_LEN); + p.run(); + } + + @Test + public void testReadPdfFileSync() throws IOException { + + String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); + + PCollection<String> output = p.apply("ParsePdfFile", + TikaIO.read().from(resourcePath).withParseSynchronously(true)); + PAssert.that(output).containsInAnyOrder(PDF_FILE); + p.run(); + } + + @Test + public void testReadDamagedPdfFile() throws IOException { + + doTestReadDamagedPdfFile(false); + } + + @Test + public void testReadDamagedPdfFileSync() throws IOException { + doTestReadDamagedPdfFile(true); + } + + private void doTestReadDamagedPdfFile(boolean sync) throws IOException { + + String resourcePath = getClass().getResource("/damaged.pdf").getPath(); + + p.apply("ParseInvalidPdfFile", + TikaIO.read().from(resourcePath).withParseSynchronously(sync)); + try { + p.run(); + fail("Transform failure is expected"); + } catch (RuntimeException ex) { + assertTrue(ex.getCause().getCause() instanceof TikaException); + } + } + + @Test + public void testReadDisplayData() { + TikaIO.Read read = TikaIO.read() + .from("foo.*") + .withTikaConfigPath("tikaconfigpath") + .withContentTypeHint("application/pdf") + .withMinimumTextlength(100) + .withReadOutputMetadata(true); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); + assertThat(displayData, hasDisplayItem("inputMetadata", + "[Content-Type=application/pdf]")); + assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); + assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); + assertThat(displayData, hasDisplayItem("queuePollTime", "50")); + assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); + assertThat(displayData, hasDisplayItem("minTextLen", "100")); + assertEquals(8, displayData.items().size()); + } + + @Test + public void testReadDisplayDataSyncMode() { + TikaIO.Read read = TikaIO.read() + .from("foo.*") + .withParseSynchronously(true); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("parseMode", "synchronous")); + assertEquals(2, displayData.items().size()); + } + + @Test + public void testReadDisplayDataWithDefaultOptions() { + final String[] args = new String[]{"--input=/input/tika.pdf"}; + TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); + assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); + assertThat(displayData, hasDisplayItem("queuePollTime", "50")); + assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); + assertEquals(4, displayData.items().size()); + } + @Test + public void testReadDisplayDataWithCustomOptions() { + final String[] args = new String[]{"--input=/input/tika.pdf", + "--tikaConfigPath=/tikaConfigPath", + "--queuePollTime=10", + "--queueMaxPollTime=1000", + "--contentTypeHint=application/pdf", + "--readOutputMetadata=true"}; + TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); + assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); + assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); + assertThat(displayData, hasDisplayItem("queuePollTime", "10")); + assertThat(displayData, hasDisplayItem("queueMaxPollTime", "1000")); + assertThat(displayData, hasDisplayItem("inputMetadata", + "[Content-Type=application/pdf]")); + assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); + assertEquals(7, displayData.items().size()); + } + + private static TikaOptions createOptions(String[] args) { + return PipelineOptionsFactory.fromArgs(args) + .withValidation().as(TikaOptions.class); + } + + static class FilterMetadataFn extends DoFn<String, String> { + private static final long serialVersionUID = 6338014219600516621L; + + @ProcessElement + public void processElement(ProcessContext c) { + String word = c.element(); + if (word.contains("=") && !word.startsWith("Author")) { + return; + } + c.output(word); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java new file mode 100644 index 0000000..5c4e754 --- /dev/null +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; +import org.junit.Test; + +/** + * Tests TikaReader. + */ +public class TikaReaderTest { + private static final List<String> ODT_FILE = Arrays.asList( + "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", + "the content from the files", "and", "Apache Tika"); + + @Test + public void testOdtFileAsyncReader() throws Exception { + doTestOdtFileReader(false); + } + @Test + public void testOdtFileSyncReader() throws Exception { + doTestOdtFileReader(true); + } + private void doTestOdtFileReader(boolean sync) throws Exception { + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + TikaSource source = new TikaSource(TikaIO.read() + .withParseSynchronously(sync) + .from(resourcePath)); + TikaReader reader = (TikaReader) source.createReader(null); + + List<String> content = new LinkedList<String>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + content.add(reader.getCurrent()); + } + assertTrue(content.containsAll(ODT_FILE)); + if (!sync) { + assertNotNull(reader.getExecutorService()); + } else { + assertNull(reader.getExecutorService()); + } + reader.close(); + } + + @Test + public void testOdtFilesReader() throws Exception { + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + String filePattern = resourcePath.replace("apache-beam-tika1", "*"); + + TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); + TikaSource.FilePatternTikaReader reader = + (TikaSource.FilePatternTikaReader) source.createReader(null); + List<String> content = new LinkedList<String>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + content.add(reader.getCurrent()); + } + assertTrue(content.containsAll(ODT_FILE)); + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java new file mode 100644 index 0000000..550f469 --- /dev/null +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; +import org.junit.Test; + +/** + * Tests TikaSource. + */ +public class TikaSourceTest { + + @Test + public void testOdtFileSource() throws Exception { + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + TikaSource source = new TikaSource(TikaIO.read().from(resourcePath)); + assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); + + assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); + assertTrue(source.createReader(null) instanceof TikaReader); + + List<? extends TikaSource> sources = source.split(1, null); + assertEquals(1, sources.size()); + TikaSource nextSource = sources.get(0); + assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); + assertEquals(resourcePath, nextSource.getSingleFileMetadata().resourceId().toString()); + } + + @Test + public void testOdtFilesSource() throws Exception { + String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); + String filePattern = resourcePath.replace("apache-beam-tika1", "*"); + + TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); + assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); + + assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); + assertTrue(source.createReader(null) instanceof TikaSource.FilePatternTikaReader); + + List<? extends TikaSource> sources = source.split(1, null); + assertEquals(2, sources.size()); + TikaSource nextSource = sources.get(0); + assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); + String nextSourceResource = nextSource.getSingleFileMetadata().resourceId().toString(); + TikaSource nextSource2 = sources.get(1); + assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource2.getMode()); + String nextSourceResource2 = nextSource2.getSingleFileMetadata().resourceId().toString(); + assertTrue(nextSourceResource.equals(resourcePath) && nextSourceResource2.equals(resourcePath2) + || nextSourceResource.equals(resourcePath2) && nextSourceResource2.equals(resourcePath)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip b/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip new file mode 100644 index 0000000..4c0e0ef Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip differ http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf b/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf new file mode 100644 index 0000000..d3c7f14 Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf differ http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt b/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt new file mode 100644 index 0000000..87c5577 Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt differ http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt b/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt new file mode 100644 index 0000000..a0ff320 Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt differ http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/damaged.pdf ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/resources/damaged.pdf b/sdks/java/io/tika/src/test/resources/damaged.pdf new file mode 100644 index 0000000..7653b4b --- /dev/null +++ b/sdks/java/io/tika/src/test/resources/damaged.pdf @@ -0,0 +1,2 @@ +%PDF-1.4 +
