[BEAM-2060] Allow to specify charset in XmlIO

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffc77813
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffc77813
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffc77813

Branch: refs/heads/master
Commit: ffc77813bb7883d894f65d1a70a88f6c2f56ca89
Parents: 9c39682
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Mon Apr 24 16:37:40 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 24 09:21:20 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  | 44 +++++++++++++++++-
 .../org/apache/beam/sdk/io/xml/XmlSource.java   |  8 ++--
 .../apache/beam/sdk/io/xml/XmlSourceTest.java   | 49 ++++++++++++++++++++
 3 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index bf0e1b5..ef07925 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,6 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+
 import javax.annotation.Nullable;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
@@ -77,6 +78,29 @@ public class XmlIO {
    *     .withRecordClass(Record.class));
    * }</pre>
    *
+   * <p>By default, UTF-8 charset is used. If your file is using a different 
charset, you have to
+   * specify as follow:
+   *
+   * <pre>{@code
+   * PCollection<String> output = p.apply(XmlIO.<Record>read()
+   *      .from(file.toPath().toString())
+   *      .withRooElement("root")
+   *      .withRecordElement("record")
+   *      .withRecordClass(Record.class)
+   *      .withCharset("ISO-8859-1"));
+   * }</pre>
+   *
+   * <p>Or:
+   *
+   * <pre>{@code
+   * PCollection<String> output = p.apply(XmlIO.<Record>read()
+   *      .from(file.toPath().toString())
+   *      .withRooElement("root")
+   *      .withRecordElement("record")
+   *      .withRecordClass(Record.class)
+   *      .withCharset(StandardCharsets.ISO_8859_1.name()));
+   * }</pre>
+   *
    * <p>Currently, only XML files that use single-byte characters are 
supported. Using a file that
    * contains multi-byte characters may result in data loss or duplication.
    *
@@ -94,6 +118,7 @@ public class XmlIO {
     return new AutoValue_XmlIO_Read.Builder<T>()
         .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
         .setCompressionType(Read.CompressionType.AUTO)
+        .setCharset("UTF-8")
         .build();
   }
 
@@ -220,6 +245,9 @@ public class XmlIO {
 
     abstract long getMinBundleSize();
 
+    @Nullable
+    abstract String getCharset();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -236,6 +264,8 @@ public class XmlIO {
 
       abstract Builder<T> setCompressionType(CompressionType compressionType);
 
+      abstract Builder<T> setCharset(String charset);
+
       abstract Read<T> build();
     }
 
@@ -325,6 +355,13 @@ public class XmlIO {
       return toBuilder().setCompressionType(compressionType).build();
     }
 
+    /**
+     * Sets the XML file charset.
+     */
+    public Read<T> withCharset(String charset) {
+      return toBuilder().setCharset(charset).build();
+    }
+
     @Override
     public void validate(PBegin input) {
       checkNotNull(
@@ -336,6 +373,9 @@ public class XmlIO {
       checkNotNull(
           getRecordClass(),
           "recordClass is null. Use builder method withRecordClass() to set 
this.");
+      checkNotNull(
+          getCharset(),
+          "charset is null. Use builder method withCharset() to set this.");
     }
 
     @Override
@@ -351,7 +391,9 @@ public class XmlIO {
           .addIfNotNull(
               DisplayData.item("recordElement", 
getRecordElement()).withLabel("XML Record Element"))
           .addIfNotNull(
-              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"));
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"))
+          .addIfNotNull(
+              DisplayData.item("charset", getCharset()).withLabel("Charset"));
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 876c782..1eb0e06 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -185,9 +185,11 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
       byte[] dummyStartDocumentBytes =
           (String.format(
-                  "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
+                  "<?xml version=\"%s\" encoding=\""
+                      + getCurrentSource().spec.getCharset()
+                      + "\"?><%s>",
                   XML_VERSION, getCurrentSource().spec.getRootElement()))
-              .getBytes(StandardCharsets.UTF_8);
+              .getBytes(getCurrentSource().spec.getCharset());
       preambleByteBuffer.write(dummyStartDocumentBytes);
       // Gets the byte offset (in the input file) of the first record in 
ReadableByteChannel. This
       // method returns the offset and stores any bytes that should be used 
when creating the XML
@@ -339,7 +341,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
         this.parser = xmlInputFactory.createXMLStreamReader(
             new SequenceInputStream(
                 new ByteArrayInputStream(lookAhead), 
Channels.newInputStream(channel)),
-            "UTF-8");
+            getCurrentSource().spec.getCharset());
 
         // Current offset should be the offset before reading the record 
element.
         while (true) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index 5b33be3..9321ac3 100644
--- 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -158,6 +158,11 @@ public class XmlSourceTest {
       + "</train>"
       + "</trains>";
 
+  String trainXMLWithISO88591 =
+      "<trains>"
+      + "<train 
size=\"small\"><name>Cédric</name><number>7</number><color>blue</color></train>"
+      + "</trains>";
+
   @XmlRootElement
   static class Train {
     public static final int TRAIN_NUMBER_UNDEFINED = -1;
@@ -594,6 +599,50 @@ public class XmlSourceTest {
   }
 
   @Test
+  public void testReadXMLWithCharset() throws IOException {
+    File file = tempFolder.newFile("trainXMLISO88591");
+    Files.write(file.toPath(), 
trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1));
+
+    PCollection<Train> output =
+        p.apply("ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024)
+                .withCharset(StandardCharsets.ISO_8859_1.name()));
+
+    List<Train> expectedResults =
+        ImmutableList.of(new Train("Cédric", 7, "blue", "small"));
+
+    PAssert.that(output).containsInAnyOrder(expectedResults);
+    p.run();
+  }
+
+  @Test
+  public void testReadXMLWithCharsetAsString() throws IOException {
+    File file = tempFolder.newFile("trainXMLISO88591");
+    Files.write(file.toPath(), 
trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1));
+
+    PCollection<Train> output =
+        p.apply("ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024)
+                .withCharset("ISO-8859-1"));
+
+    List<Train> expectedResults =
+        ImmutableList.of(new Train("Cédric", 7, "blue", "small"));
+
+    PAssert.that(output).containsInAnyOrder(expectedResults);
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadXMLSmallPipeline() throws IOException {
     File file = tempFolder.newFile("trainXMLSmall");

Reply via email to