[BEAM-1871] Move Xml IO and related classes to new sdks/java/io/xml package.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/393a90c7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/393a90c7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/393a90c7 Branch: refs/heads/master Commit: 393a90c74a86d7484d047316a5ccb22cd360a4d0 Parents: 022d5b6 Author: Luke Cwik <[email protected]> Authored: Fri Apr 21 15:45:04 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Fri Apr 21 16:37:44 2017 -0700 ---------------------------------------------------------------------- sdks/java/core/pom.xml | 31 +- .../org/apache/beam/sdk/coders/JAXBCoder.java | 201 ----- .../beam/sdk/coders/StringDelegateCoder.java | 3 +- .../org/apache/beam/sdk/io/FileBasedSource.java | 2 +- .../main/java/org/apache/beam/sdk/io/XmlIO.java | 476 ---------- .../java/org/apache/beam/sdk/io/XmlSink.java | 153 ---- .../java/org/apache/beam/sdk/io/XmlSource.java | 404 --------- .../beam/sdk/testing/SourceTestUtils.java | 2 +- .../apache/beam/sdk/coders/JAXBCoderTest.java | 223 ----- .../org/apache/beam/sdk/io/XmlSinkTest.java | 253 ------ .../org/apache/beam/sdk/io/XmlSourceTest.java | 892 ------------------ sdks/java/io/pom.xml | 1 + sdks/java/io/xml/pom.xml | 118 +++ .../org/apache/beam/sdk/io/xml/JAXBCoder.java | 203 +++++ .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 469 ++++++++++ .../org/apache/beam/sdk/io/xml/XmlSink.java | 160 ++++ .../org/apache/beam/sdk/io/xml/XmlSource.java | 404 +++++++++ .../apache/beam/sdk/io/xml/package-info.java | 22 + .../apache/beam/sdk/io/xml/JAXBCoderTest.java | 228 +++++ .../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 253 ++++++ .../apache/beam/sdk/io/xml/XmlSourceTest.java | 893 +++++++++++++++++++ 21 files changed, 2755 insertions(+), 2636 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 7af1444..ac7a3bb 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -232,36 +232,7 @@ <artifactId>joda-time</artifactId> </dependency> - <!-- - To use org.apache.beam.io.XmlSource: - - 1. Explicitly declare the following dependency for the stax2 API. - 2. Include a stax2 implementation on the classpath. One example - is given below as an optional runtime dependency on woodstox-core-asl - --> - <dependency> - <groupId>org.codehaus.woodstox</groupId> - <artifactId>stax2-api</artifactId> - <version>${stax2.version}</version> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.codehaus.woodstox</groupId> - <artifactId>woodstox-core-asl</artifactId> - <version>${woodstox.version}</version> - <scope>runtime</scope> - <optional>true</optional> - <exclusions> - <!-- javax.xml.stream:stax-api is included in JDK 1.6+ --> - <exclusion> - <groupId>javax.xml.stream</groupId> - <artifactId>stax-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly + <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly declare this dependency to include org.tukaani:xz on the classpath at runtime. --> <dependency> <groupId>org.tukaani</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java deleted file mode 100644 index ea636fc..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.io.ByteStreams; -import java.io.FilterInputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal; -import org.apache.beam.sdk.util.Structs; -import org.apache.beam.sdk.util.VarInt; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms - * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object. - * - * @param <T> type of JAXB annotated objects that will be serialized. - */ -public class JAXBCoder<T> extends AtomicCoder<T> { - - private final Class<T> jaxbClass; - private final TypeDescriptor<T> typeDescriptor; - private transient volatile JAXBContext jaxbContext; - private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller; - private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller; - - public Class<T> getJAXBClass() { - return jaxbClass; - } - - private JAXBCoder(Class<T> jaxbClass) { - this.jaxbClass = jaxbClass; - this.typeDescriptor = TypeDescriptor.of(jaxbClass); - this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() { - @Override - protected Marshaller initialValue() { - try { - JAXBContext jaxbContext = getContext(); - return jaxbContext.createMarshaller(); - } catch (JAXBException e) { - throw new RuntimeException("Error when creating marshaller from JAXB Context.", e); - } - } - }; - this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() { - @Override - protected Unmarshaller initialValue() { - try { - JAXBContext jaxbContext = getContext(); - return jaxbContext.createUnmarshaller(); - } catch (Exception e) { - throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e); - } - } - }; - } - - /** - * Create a coder for a given type of JAXB annotated objects. - * - * @param jaxbClass the {@code Class} of the JAXB annotated objects. - */ - public static <T> JAXBCoder<T> of(Class<T> jaxbClass) { - return new JAXBCoder<>(jaxbClass); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) - throws CoderException, IOException { - try { - if (!context.isWholeStream) { - try { - long size = getEncodedElementByteSize(value, Context.OUTER); - // record the number of bytes the XML consists of so when reading we only read the encoded - // value - VarInt.encode(size, outStream); - } catch (Exception e) { - throw new CoderException( - "An Exception occured while trying to get the size of an encoded representation", e); - } - } - - jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream)); - } catch (JAXBException e) { - throw new CoderException(e); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws CoderException, IOException { - try { - InputStream stream = inStream; - if (!context.isWholeStream) { - long limit = VarInt.decodeLong(inStream); - stream = ByteStreams.limit(inStream, limit); - } - @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream)); - return obj; - } catch (JAXBException e) { - throw new CoderException(e); - } - } - - private JAXBContext getContext() throws JAXBException { - if (jaxbContext == null) { - synchronized (this) { - if (jaxbContext == null) { - jaxbContext = JAXBContext.newInstance(jaxbClass); - } - } - } - return jaxbContext; - } - - @Override - public String getEncodingId() { - return getJAXBClass().getName(); - } - - @Override - public TypeDescriptor<T> getEncodedTypeDescriptor() { - return typeDescriptor; - } - - private static class CloseIgnoringInputStream extends FilterInputStream { - - protected CloseIgnoringInputStream(InputStream in) { - super(in); - } - - @Override - public void close() { - // Do nothing. JAXB closes the underlying stream so we must filter out those calls. - } - } - - private static class CloseIgnoringOutputStream extends FilterOutputStream { - - protected CloseIgnoringOutputStream(OutputStream out) { - super(out); - } - - @Override - public void close() throws IOException { - // JAXB closes the underlying stream so we must filter out those calls. - } - } - - //////////////////////////////////////////////////////////////////////////////////// - // JSON Serialization details below - - private static final String JAXB_CLASS = "jaxb_class"; - - /** - * Constructor for JSON deserialization only. - */ - @JsonCreator - public static <T> JAXBCoder<T> of( - @JsonProperty(JAXB_CLASS) String jaxbClassName) { - try { - @SuppressWarnings("unchecked") - Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName); - return of(jaxbClass); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - protected CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - Structs.addString(result, JAXB_CLASS, jaxbClass.getName()); - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index d4b4ae8..f86369c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -43,8 +43,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; * * <p>This method of encoding is not designed for ease of evolution of {@code Clazz}; * it should only be used in cases where the class is stable or the encoding is not - * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder}, - * or {@link JAXBCoder}. + * important. If evolution of the class is important, see {@link ProtoCoder} or {@link AvroCoder}. * * @param <T> The type of objects coded. */ http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index b2a4075..20fc4d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement * methods to create a sub-source and a reader for a range of a single file - * {@link #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to - * {@link XmlSource} for an example implementation of {@code FileBasedSource}. + * {@link TextIO TextIO.TextSource} for an example implementation of {@code FileBasedSource}. * * @param <T> Type of records represented by the source. */ http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java deleted file mode 100644 index 6ced5d4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; -import javax.annotation.Nullable; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** Transforms for reading and writing XML files using JAXB mappers. */ -public class XmlIO { - // CHECKSTYLE.OFF: JavadocStyle - /** - * Reads XML files. This source reads one or more XML files and creates a {@link PCollection} of a - * given type. Please note the example given below. - * - * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML - * element names that are defined by the user: - * - * <pre>{@code - * <root> - * <record> ... </record> - * <record> ... </record> - * <record> ... </record> - * ... - * <record> ... </record> - * </root> - * }</pre> - * - * <p>Basically, the XML document should contain a single root element with an inner list - * consisting entirely of record elements. The records may contain arbitrary XML content; however, - * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. - * This restriction enables reading from large XML files in parallel from different offsets in the - * file. - * - * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes. - * Additionally users must provide a class of a JAXB annotated Java type that can be used convert - * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. - * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type. - * Optionally users may provide a minimum size of a bundle that should be created for the source. - * - * <p>The following example shows how to use this method in a Beam pipeline: - * - * <pre>{@code - * PCollection<String> output = p.apply(XmlIO.<Record>read() - * .from(file.toPath().toString()) - * .withRootElement("root") - * .withRecordElement("record") - * .withRecordClass(Record.class)); - * }</pre> - * - * <p>Currently, only XML files that use single-byte characters are supported. Using a file that - * contains multi-byte characters may result in data loss or duplication. - * - * <p>To use this method: - * - * <ol> - * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api - * <li>Include a compatible implementation on the classpath at run-time, such as - * org.codehaus.woodstox:woodstox-core-asl - * </ol> - * - * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of - * Apache Beam. - * - * <h3>Permissions</h3> - * - * <p>Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner - * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of - * corresponding {@link PipelineRunner PipelineRunners} for more details. - * - * @param <T> Type of the objects that represent the records of the XML file. The {@code - * PCollection} generated by this source will be of this type. - */ - // CHECKSTYLE.ON: JavadocStyle - public static <T> Read<T> read() { - return new AutoValue_XmlIO_Read.Builder<T>() - .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) - .setCompressionType(Read.CompressionType.AUTO) - .build(); - } - - // CHECKSTYLE.OFF: JavadocStyle - /** - * A {@link FileBasedSink} that outputs records as XML-formatted elements. Writes a {@link - * PCollection} of records from JAXB-annotated classes to a single file location. - * - * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, - * this Sink will produce a single file consisting of a single root element that contains all of - * the elements in the PCollection. - * - * <p>XML Sinks are created with a base filename to write to, a root element name that will be - * used for the root element of the output files, and a class to bind to an XML element. This - * class will be used in the marshalling of records in an input PCollection to their XML - * representation and must be able to be bound using JAXB annotations (checked at pipeline - * construction time). - * - * <p>XML Sinks can be written to using the {@link Write} transform: - * - * <pre>{@code - * p.apply(XmlIO.<Type>write() - * .withRecordClass(Type.class) - * .withRootElement(root_element) - * .toFilenamePrefix(output_filename)); - * }</pre> - * - * <p>For example, consider the following class with JAXB annotations: - * - * <pre> - * {@literal @}XmlRootElement(name = "word_count_result") - * {@literal @}XmlType(propOrder = {"word", "frequency"}) - * public class WordFrequency { - * private String word; - * private long frequency; - * - * public WordFrequency() { } - * - * public WordFrequency(String word, long frequency) { - * this.word = word; - * this.frequency = frequency; - * } - * - * public void setWord(String word) { - * this.word = word; - * } - * - * public void setFrequency(long frequency) { - * this.frequency = frequency; - * } - * - * public long getFrequency() { - * return frequency; - * } - * - * public String getWord() { - * return word; - * } - * } - * </pre> - * - * <p>The following will produce XML output with a root element named "words" from a PCollection - * of WordFrequency objects: - * - * <pre>{@code - * p.apply(XmlIO.<WordFrequency>write() - * .withRecordClass(WordFrequency.class) - * .withRootElement("words") - * .toFilenamePrefix(output_file)); - * }</pre> - * - * <p>The output of which will look like: - * - * <pre>{@code - * <words> - * - * <word_count_result> - * <word>decreased</word> - * <frequency>1</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>War</word> - * <frequency>4</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>empress'</word> - * <frequency>14</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>stoops</word> - * <frequency>6</frequency> - * </word_count_result> - * - * ... - * </words> - * }</pre> - */ - // CHECKSTYLE.ON: JavadocStyle - public static <T> Write<T> write() { - return new AutoValue_XmlIO_Write.Builder<T>().build(); - } - - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { - private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; - - @Nullable - abstract String getFileOrPatternSpec(); - - @Nullable - abstract String getRootElement(); - - @Nullable - abstract String getRecordElement(); - - @Nullable - abstract Class<T> getRecordClass(); - - abstract CompressionType getCompressionType(); - - abstract long getMinBundleSize(); - - abstract Builder<T> toBuilder(); - - @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec); - - abstract Builder<T> setRootElement(String rootElement); - - abstract Builder<T> setRecordElement(String recordElement); - - abstract Builder<T> setRecordClass(Class<T> recordClass); - - abstract Builder<T> setMinBundleSize(long minBundleSize); - - abstract Builder<T> setCompressionType(CompressionType compressionType); - - abstract Read<T> build(); - } - - /** Strategy for determining the compression type of XML files being read. */ - public enum CompressionType { - /** Automatically determine the compression type based on filename extension. */ - AUTO(""), - /** Uncompressed (i.e., may be split). */ - UNCOMPRESSED(""), - /** GZipped. */ - GZIP(".gz"), - /** BZipped. */ - BZIP2(".bz2"), - /** Zipped. */ - ZIP(".zip"), - /** Deflate compressed. */ - DEFLATE(".deflate"); - - private String filenameSuffix; - - CompressionType(String suffix) { - this.filenameSuffix = suffix; - } - - /** - * Determine if a given filename matches a compression type based on its extension. - * - * @param filename the filename to match - * @return true iff the filename ends with the compression type's known extension. - */ - public boolean matches(String filename) { - return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); - } - } - - /** - * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML - * file should be of the form defined in {@link #read}. - */ - public Read<T> from(String fileOrPatternSpec) { - return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); - } - - /** - * Sets name of the root element of the XML document. This will be used to create a valid - * starting root element when initiating a bundle of records created from an XML document. This - * is a required parameter. - */ - public Read<T> withRootElement(String rootElement) { - return toBuilder().setRootElement(rootElement).build(); - } - - /** - * Sets name of the record element of the XML document. This will be used to determine offset of - * the first record of a bundle created from the XML document. This is a required parameter. - */ - public Read<T> withRecordElement(String recordElement) { - return toBuilder().setRecordElement(recordElement).build(); - } - - /** - * Sets a JAXB annotated class that can be populated using a record of the provided XML file. - * This will be used when unmarshalling record objects from the XML file. This is a required - * parameter. - */ - public Read<T> withRecordClass(Class<T> recordClass) { - return toBuilder().setRecordClass(recordClass).build(); - } - - /** - * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please - * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional - * parameter. - */ - public Read<T> withMinBundleSize(long minBundleSize) { - return toBuilder().setMinBundleSize(minBundleSize).build(); - } - - /** - * Decompresses all input files using the specified compression type. - * - * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. In this - * mode, the compression type of the file is determined by its extension. Supports .gz, .bz2, - * .zip and .deflate compression. - */ - public Read<T> withCompressionType(CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); - } - - @Override - public void validate(PBegin input) { - checkNotNull( - getRootElement(), - "rootElement is null. Use builder method withRootElement() to set this."); - checkNotNull( - getRecordElement(), - "recordElement is null. Use builder method withRecordElement() to set this."); - checkNotNull( - getRecordClass(), - "recordClass is null. Use builder method withRecordClass() to set this."); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder - .addIfNotDefault( - DisplayData.item("minBundleSize", getMinBundleSize()) - .withLabel("Minimum Bundle Size"), - 1L) - .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern")) - .addIfNotNull( - DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) - .addIfNotNull( - DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) - .addIfNotNull( - DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); - } - - @VisibleForTesting - BoundedSource<T> createSource() { - XmlSource<T> source = new XmlSource<>(this); - switch (getCompressionType()) { - case UNCOMPRESSED: - return source; - case AUTO: - return CompressedSource.from(source); - case BZIP2: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.BZIP2); - case GZIP: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.GZIP); - case ZIP: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.ZIP); - case DEFLATE: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.DEFLATE); - default: - throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); - } - } - - @Override - public PCollection<T> expand(PBegin input) { - return input.apply(org.apache.beam.sdk.io.Read.from(createSource())); - } - } - - /** Implementation of {@link #write}. */ - @AutoValue - public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { - @Nullable - abstract String getFilenamePrefix(); - - @Nullable - abstract Class<T> getRecordClass(); - - @Nullable - abstract String getRootElement(); - - abstract Builder<T> toBuilder(); - - @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFilenamePrefix(String baseOutputFilename); - - abstract Builder<T> setRecordClass(Class<T> recordClass); - - abstract Builder<T> setRootElement(String rootElement); - - abstract Write<T> build(); - } - - /** - * Writes to files with the given path prefix. - * - * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is - * the number of output bundles. - */ - public Write<T> toFilenamePrefix(String filenamePrefix) { - return toBuilder().setFilenamePrefix(filenamePrefix).build(); - } - - /** - * Writes objects of the given class mapped to XML elements using JAXB. - * - * <p>The specified class must be able to be used to create a JAXB context. - */ - public Write<T> withRecordClass(Class<T> recordClass) { - return toBuilder().setRecordClass(recordClass).build(); - } - - /** Sets the enclosing root element for the generated XML files. */ - public Write<T> withRootElement(String rootElement) { - return toBuilder().setRootElement(rootElement).build(); - } - - @Override - public void validate(PCollection<T> input) { - checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context."); - checkNotNull(getRootElement(), "Missing a root element name."); - checkNotNull(getFilenamePrefix(), "Missing a filename to write to."); - try { - JAXBContext.newInstance(getRecordClass()); - } catch (JAXBException e) { - throw new RuntimeException("Error binding classes to a JAXB Context.", e); - } - } - - @Override - public PDone expand(PCollection<T> input) { - return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); - } - - @VisibleForTesting - XmlSink<T> createSink() { - return new XmlSink<>(this); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - createSink().populateFileBasedDisplayData(builder); - builder - .addIfNotNull( - DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) - .addIfNotNull( - DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java deleted file mode 100644 index b890908..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.Marshaller; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.CoderUtils; - -/** Implementation of {@link XmlIO#write}. */ -class XmlSink<T> extends FileBasedSink<T> { - protected static final String XML_EXTENSION = "xml"; - - private final XmlIO.Write<T> spec; - - XmlSink(XmlIO.Write<T> spec) { - super(spec.getFilenamePrefix(), XML_EXTENSION); - this.spec = spec; - } - - /** - * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have - * been set and that the class can be bound in a JAXB context. - */ - @Override - public void validate(PipelineOptions options) { - spec.validate(null); - } - - /** - * Creates an {@link XmlWriteOperation}. - */ - @Override - public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new XmlWriteOperation<>(this); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - spec.populateDisplayData(builder); - } - - void populateFileBasedDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - } - - /** - * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s. - */ - protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> { - public XmlWriteOperation(XmlSink<T> sink) { - super(sink); - } - - /** - * Creates a {@link XmlWriter} with a marshaller for the type it will write. - */ - @Override - public XmlWriter<T> createWriter(PipelineOptions options) throws Exception { - JAXBContext context; - Marshaller marshaller; - context = JAXBContext.newInstance(getSink().spec.getRecordClass()); - marshaller = context.createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); - marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); - marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); - return new XmlWriter<>(this, marshaller); - } - - /** - * Return the XmlSink.Bound for this write operation. - */ - @Override - public XmlSink<T> getSink() { - return (XmlSink<T>) super.getSink(); - } - } - - /** - * A {@link FileBasedWriter} that can write objects as XML elements. - */ - protected static final class XmlWriter<T> extends FileBasedWriter<T> { - final Marshaller marshaller; - private OutputStream os = null; - - public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) { - super(writeOperation); - this.marshaller = marshaller; - } - - /** - * Creates the output stream that elements will be written to. - */ - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - os = Channels.newOutputStream(channel); - } - - /** - * Writes the root element opening tag. - */ - @Override - protected void writeHeader() throws Exception { - String rootElementName = getWriteOperation().getSink().spec.getRootElement(); - os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n")); - } - - /** - * Writes the root element closing tag. - */ - @Override - protected void writeFooter() throws Exception { - String rootElementName = getWriteOperation().getSink().spec.getRootElement(); - os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">")); - } - - /** - * Writes a value to the stream. - */ - @Override - public void write(T value) throws Exception { - marshaller.marshal(value, os); - } - - /** - * Return the XmlWriteOperation this write belongs to. - */ - @Override - public XmlWriteOperation<T> getWriteOperation() { - return (XmlWriteOperation<T>) super.getWriteOperation(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java deleted file mode 100644 index 4b7d3b4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.SequenceInputStream; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.charset.StandardCharsets; -import java.util.NoSuchElementException; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.ValidationEvent; -import javax.xml.bind.ValidationEventHandler; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamConstants; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamReader; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.JAXBCoder; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.codehaus.stax2.XMLInputFactory2; - -/** Implementation of {@link XmlIO#read}. */ -public class XmlSource<T> extends FileBasedSource<T> { - - private static final String XML_VERSION = "1.1"; - - private final XmlIO.Read<T> spec; - - XmlSource(XmlIO.Read<T> spec) { - super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize()); - this.spec = spec; - } - - private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) { - super(metadata, spec.getMinBundleSize(), startOffset, endOffset); - this.spec = spec; - } - - @Override - protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) { - return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end); - } - - @Override - protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) { - return new XMLReader<T>(this); - } - - @Override - public void validate() { - super.validate(); - spec.validate(null); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - spec.populateDisplayData(builder); - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return JAXBCoder.of(spec.getRecordClass()); - } - - /** - * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML - * file should be of the form defined at {@link XmlSource}. - * - * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp - * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}. - * - * @param <T> Type of objects that will be read by the reader. - */ - private static class XMLReader<T> extends FileBasedReader<T> { - // The amount of bytes read from the channel to memory when determining the starting offset of - // the first record in a bundle. After matching to starting offset of the first record the - // remaining bytes read to this buffer and the bytes still not read from the channel are used to - // create the XML parser. - private static final int BUF_SIZE = 1024; - - // This should be the maximum number of bytes a character will encode to, for any encoding - // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be - // four bytes. - private static final int MAX_CHAR_BYTES = 4; - - // In order to support reading starting in the middle of an XML file, we construct an imaginary - // well-formed document (a header and root tag followed by the contents of the input starting at - // the record boundary) and feed it to the parser. Because of this, the offset reported by the - // XML parser is not the same as offset in the original file. They differ by a constant amount: - // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset; - // Note that this is true only for files with single-byte characters. - // It appears that, as of writing, there does not exist a Java XML parser capable of correctly - // reporting byte offsets of elements in the presence of multi-byte characters. - private long parserBaseOffset = 0; - private boolean readingStarted = false; - - // If true, the current bundle does not contain any records. - private boolean emptyBundle = false; - - private Unmarshaller jaxbUnmarshaller = null; - private XMLStreamReader parser = null; - - private T currentRecord = null; - - // Byte offset of the current record in the XML file provided when creating the source. - private long currentByteOffset = 0; - - public XMLReader(XmlSource<T> source) { - super(source); - - // Set up a JAXB Unmarshaller that can be used to unmarshall record objects. - try { - JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass()); - jaxbUnmarshaller = jaxbContext.createUnmarshaller(); - - // Throw errors if validation fails. JAXB by default ignores validation errors. - jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() { - @Override - public boolean handleEvent(ValidationEvent event) { - throw new RuntimeException(event.getMessage(), event.getLinkedException()); - } - }); - } catch (JAXBException e) { - throw new RuntimeException(e); - } - } - - @Override - public synchronized XmlSource<T> getCurrentSource() { - return (XmlSource<T>) super.getCurrentSource(); - } - - @Override - protected void startReading(ReadableByteChannel channel) throws IOException { - // This method determines the correct starting offset of the first record by reading bytes - // from the ReadableByteChannel. This implementation does not need the channel to be a - // SeekableByteChannel. - // The method tries to determine the first record element in the byte channel. The first - // record must start with the characters "<recordElement" where "recordElement" is the - // record element of the XML document described above. For the match to be complete this - // has to be followed by one of following. - // * any whitespace character - // * '>' character - // * '/' character (to support empty records). - // - // After this match this method creates the XML parser for parsing the XML document, - // feeding it a fake document consisting of an XML header and the <rootElement> tag followed - // by the contents of channel starting from <recordElement. The <rootElement> tag may be never - // closed. - - // This stores any bytes that should be used prior to the remaining bytes of the channel when - // creating an XML parser object. - ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream(); - // A dummy declaration and root for the document with proper XML version and encoding. Without - // this XML parsing may fail or may produce incorrect results. - - byte[] dummyStartDocumentBytes = - (String.format( - "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>", - XML_VERSION, getCurrentSource().spec.getRootElement())) - .getBytes(StandardCharsets.UTF_8); - preambleByteBuffer.write(dummyStartDocumentBytes); - // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This - // method returns the offset and stores any bytes that should be used when creating the XML - // parser in preambleByteBuffer. - long offsetInFileOfRecordElement = - getFirstOccurenceOfRecordElement(channel, preambleByteBuffer); - if (offsetInFileOfRecordElement < 0) { - // Bundle has no records. So marking this bundle as an empty bundle. - emptyBundle = true; - return; - } else { - byte[] preambleBytes = preambleByteBuffer.toByteArray(); - currentByteOffset = offsetInFileOfRecordElement; - setUpXMLParser(channel, preambleBytes); - parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length; - } - readingStarted = true; - } - - // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts - // any bytes read past the starting offset of the next record back to the preambleByteBuffer. - // If a record is found, returns the starting offset of the record, otherwise - // returns -1. - private long getFirstOccurenceOfRecordElement( - ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException { - int byteIndexInRecordElementToMatch = 0; - // Index of the byte in the string "<recordElement" to be matched - // against the current byte from the stream. - boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the - // next character to confirm if this is a positive match. - boolean fullyMatched = false; // If true, record element was fully matched. - - // This gives the offset of the byte currently being read. We do a '-1' here since we - // increment this value at the beginning of the while loop below. - long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1; - long startingOffsetInFileOfCurrentMatch = -1; - // If this is non-negative, currently there is a match in progress and this value gives the - // starting offset of the match currently being conducted. - boolean matchStarted = false; // If true, a match is currently in progress. - - // These two values are used to determine the character immediately following a match for - // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above. - byte[] charBytes = new byte[MAX_CHAR_BYTES]; - int charBytesFound = 0; - - ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); - byte[] recordStartBytes = - ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8); - - outer: while (channel.read(buf) > 0) { - buf.flip(); - while (buf.hasRemaining()) { - offsetInFileOfCurrentByte++; - byte b = buf.get(); - boolean reset = false; - if (recordStartBytesMatched) { - // We already matched "<recordElement" reading the next character to determine if this - // is a positive match for a new record. - charBytes[charBytesFound] = b; - charBytesFound++; - Character c = null; - if (charBytesFound == charBytes.length) { - CharBuffer charBuf = CharBuffer.allocate(1); - InputStream charBufStream = new ByteArrayInputStream(charBytes); - java.io.Reader reader = - new InputStreamReader(charBufStream, StandardCharsets.UTF_8); - int read = reader.read(); - if (read <= 0) { - return -1; - } - charBuf.flip(); - c = (char) read; - } else { - continue; - } - - // Record start may be of following forms - // * "<recordElement<whitespace>..." - // * "<recordElement>..." - // * "<recordElement/..." - if (Character.isWhitespace(c) || c == '>' || c == '/') { - fullyMatched = true; - // Add the recordStartBytes and charBytes to preambleByteBuffer since these were - // already read from the channel. - preambleByteBuffer.write(recordStartBytes); - preambleByteBuffer.write(charBytes); - // Also add the rest of the current buffer to preambleByteBuffer. - while (buf.hasRemaining()) { - preambleByteBuffer.write(buf.get()); - } - break outer; - } else { - // Matching was unsuccessful. Reset the buffer to include bytes read for the char. - ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE); - newbuf.put(charBytes); - offsetInFileOfCurrentByte -= charBytes.length; - while (buf.hasRemaining()) { - newbuf.put(buf.get()); - } - newbuf.flip(); - buf = newbuf; - - // Ignore everything and try again starting from the current buffer. - reset = true; - } - } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) { - // Next byte matched. - if (!matchStarted) { - // Match was for the first byte, record the starting offset. - matchStarted = true; - startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte; - } - byteIndexInRecordElementToMatch++; - } else { - // Not a match. Ignore everything and try again starting at current point. - reset = true; - } - if (reset) { - // Clear variables and try to match starting from the next byte. - byteIndexInRecordElementToMatch = 0; - startingOffsetInFileOfCurrentMatch = -1; - matchStarted = false; - recordStartBytesMatched = false; - charBytes = new byte[MAX_CHAR_BYTES]; - charBytesFound = 0; - } - if (byteIndexInRecordElementToMatch == recordStartBytes.length) { - // "<recordElement" matched. Need to still check next byte since this might be an - // element that has "recordElement" as a prefix. - recordStartBytesMatched = true; - } - } - buf.clear(); - } - - if (!fullyMatched) { - return -1; - } else { - return startingOffsetInFileOfCurrentMatch; - } - } - - private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException { - try { - // We use Woodstox because the StAX implementation provided by OpenJDK reports - // character locations incorrectly. Note that Woodstox still currently reports *byte* - // locations incorrectly when parsing documents that contain multi-byte characters. - XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance(); - this.parser = xmlInputFactory.createXMLStreamReader( - new SequenceInputStream( - new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)), - "UTF-8"); - - // Current offset should be the offset before reading the record element. - while (true) { - int event = parser.next(); - if (event == XMLStreamConstants.START_ELEMENT) { - String localName = parser.getLocalName(); - if (localName.equals(getCurrentSource().spec.getRecordElement())) { - break; - } - } - } - } catch (FactoryConfigurationError | XMLStreamException e) { - throw new IOException(e); - } - } - - @Override - protected boolean readNextRecord() throws IOException { - if (emptyBundle) { - currentByteOffset = Long.MAX_VALUE; - return false; - } - try { - // Update current offset and check if the next value is the record element. - currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); - while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) { - parser.next(); - currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); - if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) { - currentByteOffset = Long.MAX_VALUE; - return false; - } - } - JAXBElement<T> jb = - jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass()); - currentRecord = jb.getValue(); - return true; - } catch (JAXBException | XMLStreamException e) { - throw new IOException(e); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (!readingStarted) { - throw new NoSuchElementException(); - } - return currentRecord; - } - - @Override - protected boolean isAtSplitPoint() { - // Every record is at a split point. - return true; - } - - @Override - protected long getCurrentOffset() { - return currentByteOffset; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index 2ab5b35..fd7ae85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory; * </ul> * For example usages, see the unit tests of classes such as * {@link org.apache.beam.sdk.io.AvroSource} or - * {@link org.apache.beam.sdk.io.XmlSource}. + * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}. * * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath. */ http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java deleted file mode 100644 index 55701bf..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Unit tests for {@link JAXBCoder}. */ -@RunWith(JUnit4.class) -public class JAXBCoderTest { - - @XmlRootElement - static class TestType { - private String testString = null; - private int testInt; - - public TestType() {} - - public TestType(String testString, int testInt) { - this.testString = testString; - this.testInt = testInt; - } - - public String getTestString() { - return testString; - } - - public void setTestString(String testString) { - this.testString = testString; - } - - public int getTestInt() { - return testInt; - } - - public void setTestInt(int testInt) { - this.testInt = testInt; - } - - @Override - public int hashCode() { - int hashCode = 1; - hashCode = 31 * hashCode + (testString == null ? 0 : testString.hashCode()); - hashCode = 31 * hashCode + testInt; - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof TestType)) { - return false; - } - - TestType other = (TestType) obj; - return (testString == null || testString.equals(other.testString)) - && (testInt == other.testInt); - } - } - - @Test - public void testEncodeDecodeOuter() throws Exception { - JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class); - - byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); - assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); - } - - @Test - public void testEncodeDecodeAfterClone() throws Exception { - JAXBCoder<TestType> coder = SerializableUtils.clone(JAXBCoder.of(TestType.class)); - - byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); - assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); - } - - @Test - public void testEncodeDecodeNested() throws Exception { - JAXBCoder<TestType> jaxbCoder = JAXBCoder.of(TestType.class); - TestCoder nesting = new TestCoder(jaxbCoder); - - byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999)); - assertEquals( - new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded)); - } - - @Test - public void testEncodeDecodeMultithreaded() throws Throwable { - final JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class); - int numThreads = 100; - - final CountDownLatch ready = new CountDownLatch(numThreads); - final CountDownLatch start = new CountDownLatch(1); - final CountDownLatch done = new CountDownLatch(numThreads); - - final AtomicReference<Throwable> thrown = new AtomicReference<>(); - - Executor executor = Executors.newCachedThreadPool(); - for (int i = 0; i < numThreads; i++) { - final TestType elem = new TestType("abc", i); - final int index = i; - executor.execute( - new Runnable() { - @Override - public void run() { - ready.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - } - - try { - byte[] encoded = CoderUtils.encodeToByteArray(coder, elem); - assertEquals( - new TestType("abc", index), CoderUtils.decodeFromByteArray(coder, encoded)); - } catch (Throwable e) { - thrown.compareAndSet(null, e); - } - done.countDown(); - } - }); - } - ready.await(); - start.countDown(); - - done.await(); - Throwable actuallyThrown = thrown.get(); - if (actuallyThrown != null) { - throw actuallyThrown; - } - } - - /** - * A coder that surrounds the value with two values, to demonstrate nesting. - */ - private static class TestCoder extends StandardCoder<TestType> { - private final JAXBCoder<TestType> jaxbCoder; - public TestCoder(JAXBCoder<TestType> jaxbCoder) { - this.jaxbCoder = jaxbCoder; - } - - @Override - public void encode(TestType value, OutputStream outStream, Context context) - throws CoderException, IOException { - Context nestedContext = context.nested(); - VarIntCoder.of().encode(3, outStream, nestedContext); - jaxbCoder.encode(value, outStream, nestedContext); - VarLongCoder.of().encode(22L, outStream, context); - } - - @Override - public TestType decode(InputStream inStream, Context context) - throws CoderException, IOException { - Context nestedContext = context.nested(); - VarIntCoder.of().decode(inStream, nestedContext); - TestType result = jaxbCoder.decode(inStream, nestedContext); - VarLongCoder.of().decode(inStream, context); - return result; - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return ImmutableList.of(jaxbCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - jaxbCoder.verifyDeterministic(); - } - } - - @Test - public void testEncodable() throws Exception { - CoderProperties.coderSerializable(JAXBCoder.of(TestType.class)); - } - - @Test - public void testEncodingId() throws Exception { - Coder<TestType> coder = JAXBCoder.of(TestType.class); - CoderProperties.coderHasEncodingId( - coder, TestType.class.getName()); - } - - @Test - public void testEncodedTypeDescriptor() throws Exception { - assertThat( - JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(), - equalTo(TypeDescriptor.of(TestType.class))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java deleted file mode 100644 index 7f559d1..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import com.google.common.collect.Lists; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.nio.channels.WritableByteChannel; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.XmlType; -import org.apache.beam.sdk.io.XmlSink.XmlWriteOperation; -import org.apache.beam.sdk.io.XmlSink.XmlWriter; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for XmlSink. - */ -@RunWith(JUnit4.class) -public class XmlSinkTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - private String testRootElement = "testElement"; - private String testFilePrefix = "/path/to/testPrefix"; - - /** - * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. - */ - @Test - public void testXmlWriter() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - XmlWriteOperation<Bird> writeOp = - XmlIO.<Bird>write() - .toFilenamePrefix(testFilePrefix) - .withRecordClass(Bird.class) - .withRootElement("birds") - .createSink() - .createWriteOperation(options); - XmlWriter<Bird> writer = writeOp.createWriter(options); - - List<Bird> bundle = - Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose")); - List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>robin</species>", - "<adjective>bemused</adjective>", "</bird>", "<bird>", "<species>goose</species>", - "<adjective>evasive</adjective>", "</bird>", "</birds>"); - runTestWrite(writer, bundle, lines); - } - - /** - * Builder methods correctly initialize an XML Sink. - */ - @Test - public void testBuildXmlWriteTransform() { - XmlIO.Write<Bird> write = - XmlIO.<Bird>write() - .toFilenamePrefix(testFilePrefix) - .withRecordClass(Bird.class) - .withRootElement(testRootElement); - assertEquals(Bird.class, write.getRecordClass()); - assertEquals(testRootElement, write.getRootElement()); - assertEquals(testFilePrefix, write.getFilenamePrefix()); - } - - /** Validation ensures no fields are missing. */ - @Test - public void testValidateXmlSinkMissingRecordClass() { - thrown.expect(NullPointerException.class); - XmlIO.<Bird>write() - .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) - .validate(null); - } - - @Test - public void testValidateXmlSinkMissingRootElement() { - thrown.expect(NullPointerException.class); - XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null); - } - - @Test - public void testValidateXmlSinkMissingFilePrefix() { - thrown.expect(NullPointerException.class); - XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); - } - - /** - * An XML Sink correctly creates an XmlWriteOperation. - */ - @Test - public void testCreateWriteOperations() { - PipelineOptions options = PipelineOptionsFactory.create(); - XmlSink<Bird> sink = - XmlIO.<Bird>write() - .withRecordClass(Bird.class) - .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) - .createSink(); - XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options); - Path outputPath = new File(testFilePrefix).toPath(); - Path tempPath = new File(writeOp.tempDirectory.get()).toPath(); - assertEquals(outputPath.getParent(), tempPath.getParent()); - assertThat( - tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); - } - - /** - * An XmlWriteOperation correctly creates an XmlWriter. - */ - @Test - public void testCreateWriter() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - XmlWriteOperation<Bird> writeOp = - XmlIO.<Bird>write() - .withRecordClass(Bird.class) - .withRootElement(testRootElement) - .toFilenamePrefix(testFilePrefix) - .createSink() - .createWriteOperation(options); - XmlWriter<Bird> writer = writeOp.createWriter(options); - Path outputPath = new File(testFilePrefix).toPath(); - Path tempPath = new File(writer.getWriteOperation().tempDirectory.get()).toPath(); - assertEquals(outputPath.getParent(), tempPath.getParent()); - assertThat( - tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); - assertNotNull(writer.marshaller); - } - - @Test - public void testDisplayData() { - XmlIO.Write<Integer> write = XmlIO.<Integer>write() - .toFilenamePrefix("foobar") - .withRootElement("bird") - .withRecordClass(Integer.class); - - DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); - assertThat(displayData, hasDisplayItem("rootElement", "bird")); - assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); - } - - /** - * Write a bundle with an XmlWriter and verify the output is expected. - */ - private <T> void runTestWrite(XmlWriter<T> writer, List<T> bundle, List<String> expected) - throws Exception { - File tmpFile = tmpFolder.newFile("foo.txt"); - try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) { - writeBundle(writer, bundle, fileOutputStream.getChannel()); - } - List<String> lines = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { - for (;;) { - String line = reader.readLine(); - if (line == null) { - break; - } - line = line.trim(); - if (line.length() > 0) { - lines.add(line); - } - } - assertEquals(expected, lines); - } - } - - /** - * Write a bundle with an XmlWriter. - */ - private <T> void writeBundle(XmlWriter<T> writer, List<T> elements, WritableByteChannel channel) - throws Exception { - writer.prepareWrite(channel); - writer.writeHeader(); - for (T elem : elements) { - writer.write(elem); - } - writer.writeFooter(); - } - - /** - * Test JAXB annotated class. - */ - @SuppressWarnings("unused") - @XmlRootElement(name = "bird") - @XmlType(propOrder = {"name", "adjective"}) - private static final class Bird { - private String name; - private String adjective; - - @XmlElement(name = "species") - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getAdjective() { - return adjective; - } - - public void setAdjective(String adjective) { - this.adjective = adjective; - } - - public Bird() {} - - public Bird(String adjective, String name) { - this.adjective = adjective; - this.name = name; - } - } -}
