[BEAM-2060] Allow to specify charset in XmlIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffc77813 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffc77813 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffc77813 Branch: refs/heads/master Commit: ffc77813bb7883d894f65d1a70a88f6c2f56ca89 Parents: 9c39682 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Mon Apr 24 16:37:40 2017 +0200 Committer: Luke Cwik <lc...@google.com> Committed: Mon Apr 24 09:21:20 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 44 +++++++++++++++++- .../org/apache/beam/sdk/io/xml/XmlSource.java | 8 ++-- .../apache/beam/sdk/io/xml/XmlSourceTest.java | 49 ++++++++++++++++++++ 3 files changed, 97 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index bf0e1b5..ef07925 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; + import javax.annotation.Nullable; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; @@ -77,6 +78,29 @@ public class XmlIO { * .withRecordClass(Record.class)); * }</pre> * + * <p>By default, UTF-8 charset is used. If your file is using a different charset, you have to + * specify as follow: + * + * <pre>{@code + * PCollection<String> output = p.apply(XmlIO.<Record>read() + * .from(file.toPath().toString()) + * .withRooElement("root") + * .withRecordElement("record") + * .withRecordClass(Record.class) + * .withCharset("ISO-8859-1")); + * }</pre> + * + * <p>Or: + * + * <pre>{@code + * PCollection<String> output = p.apply(XmlIO.<Record>read() + * .from(file.toPath().toString()) + * .withRooElement("root") + * .withRecordElement("record") + * .withRecordClass(Record.class) + * .withCharset(StandardCharsets.ISO_8859_1.name())); + * }</pre> + * * <p>Currently, only XML files that use single-byte characters are supported. Using a file that * contains multi-byte characters may result in data loss or duplication. * @@ -94,6 +118,7 @@ public class XmlIO { return new AutoValue_XmlIO_Read.Builder<T>() .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) .setCompressionType(Read.CompressionType.AUTO) + .setCharset("UTF-8") .build(); } @@ -220,6 +245,9 @@ public class XmlIO { abstract long getMinBundleSize(); + @Nullable + abstract String getCharset(); + abstract Builder<T> toBuilder(); @AutoValue.Builder @@ -236,6 +264,8 @@ public class XmlIO { abstract Builder<T> setCompressionType(CompressionType compressionType); + abstract Builder<T> setCharset(String charset); + abstract Read<T> build(); } @@ -325,6 +355,13 @@ public class XmlIO { return toBuilder().setCompressionType(compressionType).build(); } + /** + * Sets the XML file charset. + */ + public Read<T> withCharset(String charset) { + return toBuilder().setCharset(charset).build(); + } + @Override public void validate(PBegin input) { checkNotNull( @@ -336,6 +373,9 @@ public class XmlIO { checkNotNull( getRecordClass(), "recordClass is null. Use builder method withRecordClass() to set this."); + checkNotNull( + getCharset(), + "charset is null. Use builder method withCharset() to set this."); } @Override @@ -351,7 +391,9 @@ public class XmlIO { .addIfNotNull( DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) .addIfNotNull( - DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")) + .addIfNotNull( + DisplayData.item("charset", getCharset()).withLabel("Charset")); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java index 876c782..1eb0e06 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java @@ -185,9 +185,11 @@ public class XmlSource<T> extends FileBasedSource<T> { byte[] dummyStartDocumentBytes = (String.format( - "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>", + "<?xml version=\"%s\" encoding=\"" + + getCurrentSource().spec.getCharset() + + "\"?><%s>", XML_VERSION, getCurrentSource().spec.getRootElement())) - .getBytes(StandardCharsets.UTF_8); + .getBytes(getCurrentSource().spec.getCharset()); preambleByteBuffer.write(dummyStartDocumentBytes); // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This // method returns the offset and stores any bytes that should be used when creating the XML @@ -339,7 +341,7 @@ public class XmlSource<T> extends FileBasedSource<T> { this.parser = xmlInputFactory.createXMLStreamReader( new SequenceInputStream( new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)), - "UTF-8"); + getCurrentSource().spec.getCharset()); // Current offset should be the offset before reading the record element. while (true) { http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java index 5b33be3..9321ac3 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java @@ -158,6 +158,11 @@ public class XmlSourceTest { + "</train>" + "</trains>"; + String trainXMLWithISO88591 = + "<trains>" + + "<train size=\"small\"><name>Cédric</name><number>7</number><color>blue</color></train>" + + "</trains>"; + @XmlRootElement static class Train { public static final int TRAIN_NUMBER_UNDEFINED = -1; @@ -594,6 +599,50 @@ public class XmlSourceTest { } @Test + public void testReadXMLWithCharset() throws IOException { + File file = tempFolder.newFile("trainXMLISO88591"); + Files.write(file.toPath(), trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1)); + + PCollection<Train> output = + p.apply("ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .withCharset(StandardCharsets.ISO_8859_1.name())); + + List<Train> expectedResults = + ImmutableList.of(new Train("Cédric", 7, "blue", "small")); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testReadXMLWithCharsetAsString() throws IOException { + File file = tempFolder.newFile("trainXMLISO88591"); + Files.write(file.toPath(), trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1)); + + PCollection<Train> output = + p.apply("ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .withCharset("ISO-8859-1")); + + List<Train> expectedResults = + ImmutableList.of(new Train("Cédric", 7, "blue", "small")); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test @Category(NeedsRunner.class) public void testReadXMLSmallPipeline() throws IOException { File file = tempFolder.newFile("trainXMLSmall");