http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
deleted file mode 100644
index 0120b8b..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ /dev/null
@@ -1,892 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
-import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableList;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.beam.sdk.io.Source.Reader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Matchers;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests XmlSource.
- */
-@RunWith(JUnit4.class)
-public class XmlSourceTest {
-
-  @Rule
-  public TestPipeline p = TestPipeline.create();
-
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  String tinyXML =
-      
"<trains><train><name>Thomas</name></train><train><name>Henry</name></train>"
-      + "<train><name>James</name></train></trains>";
-
-  String xmlWithMultiByteElementName =
-      
"<දුම්රියන්><දුම්රිය><name>Thomas</name></දුම්රිය><දුම්රිය><name>Henry</name></දුම්රිය>"
-      + 
"<දුම්රිය><name>James</name></දුම්රිය></දුම්රියන්>";
-
-  String xmlWithMultiByteChars =
-      
"<trains><train><name>Thomas¥</name></train><train><name>Hen¶ry</name></train>"
-      + "<train><name>Jamßes</name></train></trains>";
-
-  String trainXML =
-      "<trains>"
-      + 
"<train><name>Thomas</name><number>1</number><color>blue</color></train>"
-      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>"
-      + 
"<train><name>Toby</name><number>7</number><color>brown</color></train>"
-      + 
"<train><name>Gordon</name><number>4</number><color>blue</color></train>"
-      + 
"<train><name>Emily</name><number>-1</number><color>red</color></train>"
-      + 
"<train><name>Percy</name><number>6</number><color>green</color></train>"
-      + "</trains>";
-
-  String trainXMLWithEmptyTags =
-      "<trains>"
-      + "<train/>"
-      + 
"<train><name>Thomas</name><number>1</number><color>blue</color></train>"
-      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>"
-      + "<train/>"
-      + 
"<train><name>Toby</name><number>7</number><color>brown</color></train>"
-      + 
"<train><name>Gordon</name><number>4</number><color>blue</color></train>"
-      + 
"<train><name>Emily</name><number>-1</number><color>red</color></train>"
-      + 
"<train><name>Percy</name><number>6</number><color>green</color></train>"
-      + "</trains>";
-
-  String trainXMLWithAttributes =
-      "<trains>"
-      + "<train 
size=\"small\"><name>Thomas</name><number>1</number><color>blue</color></train>"
-      + "<train 
size=\"big\"><name>Henry</name><number>3</number><color>green</color></train>"
-      + "<train 
size=\"small\"><name>Toby</name><number>7</number><color>brown</color></train>"
-      + "<train 
size=\"big\"><name>Gordon</name><number>4</number><color>blue</color></train>"
-      + "<train 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
-      + "<train 
size=\"small\"><name>Percy</name><number>6</number><color>green</color></train>"
-      + "</trains>";
-
-  String trainXMLWithSpaces =
-      "<trains>"
-      + "<train><name>Thomas   </name>   
<number>1</number><color>blue</color></train>"
-      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>\n"
-      + "<train><name>Toby</name><number>7</number><color>  brown  
</color></train>  "
-      + "<train><name>Gordon</name>   
<number>4</number><color>blue</color>\n</train>\t"
-      + 
"<train><name>Emily</name><number>-1</number>\t<color>red</color></train>"
-      + "<train>\n<name>Percy</name>   <number>6  </number>   
<color>green</color></train>"
-      + "</trains>";
-
-  String trainXMLWithAllFeaturesMultiByte =
-      "<දුම්රියන්>"
-      + "<දුම්රිය/>"
-      + "<දුම්රිය size=\"small\"><name> 
Thomas¥</name><number>1</number><color>blue</color>"
-      + "</දුම්රිය>"
-      + "<දුම්රිය size=\"big\"><name>He 
nry</name><number>3</number><color>green</color></දුම්රිය>"
-      + "<දුම්රිය size=\"small\"><name>Toby  
</name><number>7</number><color>br¶own</color>"
-      + "</දුම්රිය>"
-      + "<දුම්රිය/>"
-      + "<දුම්රිය 
size=\"big\"><name>Gordon</name><number>4</number><color> 
blue</color></දුම්රිය>"
-      + "<දුම්රිය 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></දුම්රිය>"
-      + "<දුම්රිය 
size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
-      + "</දුම්රිය>"
-      + "</දුම්රියන්>";
-
-  String trainXMLWithAllFeaturesSingleByte =
-      "<trains>"
-      + "<train/>"
-      + "<train size=\"small\"><name> 
Thomas</name><number>1</number><color>blue</color>"
-      + "</train>"
-      + "<train size=\"big\"><name>He 
nry</name><number>3</number><color>green</color></train>"
-      + "<train size=\"small\"><name>Toby  
</name><number>7</number><color>brown</color>"
-      + "</train>"
-      + "<train/>"
-      + "<train size=\"big\"><name>Gordon</name><number>4</number><color> 
blue</color></train>"
-      + "<train 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
-      + "<train 
size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
-      + "</train>"
-      + "</trains>";
-
-  @XmlRootElement
-  static class Train {
-    public static final int TRAIN_NUMBER_UNDEFINED = -1;
-    public String name = null;
-    public String color = null;
-    public int number = TRAIN_NUMBER_UNDEFINED;
-
-    @XmlAttribute(name = "size")
-    public String size = null;
-
-    public Train() {}
-
-    public Train(String name, int number, String color, String size) {
-      this.name = name;
-      this.number = number;
-      this.color = color;
-      this.size = size;
-    }
-
-    @Override
-    public int hashCode() {
-      int hashCode = 1;
-      hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode());
-      hashCode = 31 * hashCode + number;
-      hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode());
-      hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode());
-      return hashCode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof Train)) {
-        return false;
-      }
-
-      Train other = (Train) obj;
-      return (name == null || name.equals(other.name)) && (number == 
other.number)
-          && (color == null || color.equals(other.color))
-          && (size == null || size.equals(other.size));
-    }
-
-    @Override
-    public String toString() {
-      String str = "Train[";
-      boolean first = true;
-      if (name != null) {
-        str = str + "name=" + name;
-        first = false;
-      }
-      if (number != Integer.MIN_VALUE) {
-        if (!first) {
-          str = str + ",";
-        }
-        str = str + "number=" + number;
-        first = false;
-      }
-      if (color != null) {
-        if (!first) {
-          str = str + ",";
-        }
-        str = str + "color=" + color;
-        first = false;
-      }
-      if (size != null) {
-        if (!first) {
-          str = str + ",";
-        }
-        str = str + "size=" + size;
-      }
-      str = str + "]";
-      return str;
-    }
-  }
-
-  private List<Train> generateRandomTrainList(int size) {
-    String[] names = {"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", 
"Mavis", "Edward",
-        "Bertie", "Harold", "Hiro", "Terence", "Salty", "Trevor"};
-    int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9};
-    String[] colors = {"red", "blue", "green", "orange", "brown", "black", 
"white"};
-    String[] sizes = {"small", "medium", "big"};
-
-    Random random = new Random(System.currentTimeMillis());
-
-    List<Train> trains = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      trains.add(new Train(names[random.nextInt(names.length - 1)],
-          numbers[random.nextInt(numbers.length - 1)], 
colors[random.nextInt(colors.length - 1)],
-          sizes[random.nextInt(sizes.length - 1)]));
-    }
-
-    return trains;
-  }
-
-  private String trainToXMLElement(Train train) {
-    return "<train size=\"" + train.size + "\"><name>" + train.name + 
"</name><number>"
-        + train.number + "</number><color>" + train.color + "</color></train>";
-  }
-
-  private File createRandomTrainXML(String fileName, List<Train> trains) 
throws IOException {
-    File file = tempFolder.newFile(fileName);
-    try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
-      writer.write("<trains>");
-      writer.newLine();
-      for (Train train : trains) {
-        String str = trainToXMLElement(train);
-        writer.write(str);
-        writer.newLine();
-      }
-      writer.write("</trains>");
-      writer.newLine();
-    }
-    return file;
-  }
-
-  private List<Train> readEverythingFromReader(Reader<Train> reader) throws 
IOException {
-    List<Train> results = new ArrayList<>();
-    for (boolean available = reader.start(); available; available = 
reader.advance()) {
-      Train train = reader.getCurrent();
-      results.add(train);
-    }
-    return results;
-  }
-
-  @Test
-  public void testReadXMLTiny() throws IOException {
-    File file = tempFolder.newFile("trainXMLTiny");
-    Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(
-        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  public void testReadXMLWithMultiByteChars() throws IOException {
-    File file = tempFolder.newFile("trainXMLTiny");
-    Files.write(file.toPath(), 
xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(
-        new Train("Thomas¥", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("Jamßes", Train.TRAIN_NUMBER_UNDEFINED, null, null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  @Ignore(
-      "Multi-byte characters in XML are not supported because the parser "
-          + "currently does not correctly report byte offsets")
-  public void testReadXMLWithMultiByteElementName() throws IOException {
-    File file = tempFolder.newFile("trainXMLTiny");
-    Files.write(file.toPath(), 
xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("දුම්රියන්")
-            .withRecordElement("දුම්රිය")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(
-        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  public void testSplitWithEmptyBundleAtEnd() throws Exception {
-    File file = tempFolder.newFile("trainXMLTiny");
-    Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(10)
-            .createSource();
-    List<? extends BoundedSource<Train>> splits = source.split(50, null);
-
-    assertTrue(splits.size() > 2);
-
-    List<Train> results = new ArrayList<>();
-    for (BoundedSource<Train> split : splits) {
-      results.addAll(readEverythingFromReader(split.createReader(null)));
-    }
-
-    List<Train> expectedResults = ImmutableList.of(
-        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
-        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
-
-    assertThat(
-        trainsToStrings(expectedResults), 
containsInAnyOrder(trainsToStrings(results).toArray()));
-  }
-
-  List<String> trainsToStrings(List<Train> input) {
-    List<String> strings = new ArrayList<>();
-    for (Object data : input) {
-      strings.add(data.toString());
-    }
-    return strings;
-  }
-
-  @Test
-  public void testReadXMLSmall() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults =
-        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
-            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
-            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  public void testReadXMLNoRootElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "rootElement is null. Use builder method withRootElement() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLNoRecordElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "recordElement is null. Use builder method withRecordElement() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLNoRecordClass() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "recordClass is null. Use builder method withRecordClass() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLIncorrectRootElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("something")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    exception.expectMessage("Unexpected close tag </trains>; expected 
</something>.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLIncorrectRecordElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("something")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    assertEquals(readEverythingFromReader(source.createReader(null)), new 
ArrayList<Train>());
-  }
-
-  @XmlRootElement
-  private static class WrongTrainType {
-    @SuppressWarnings("unused")
-    public String something;
-  }
-
-  @Test
-  public void testReadXMLInvalidRecordClass() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<WrongTrainType> source =
-        XmlIO.<WrongTrainType>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(WrongTrainType.class)
-            .createSource();
-
-    exception.expect(RuntimeException.class);
-
-    // JAXB internationalizes the error message. So this is all we can match 
for.
-    
exception.expectMessage(both(containsString("name")).and(Matchers.containsString("something")));
-    try (Reader<WrongTrainType> reader = source.createReader(null)) {
-
-      List<WrongTrainType> results = new ArrayList<>();
-      for (boolean available = reader.start(); available; available = 
reader.advance()) {
-        WrongTrainType train = reader.getCurrent();
-        results.add(train);
-      }
-    }
-  }
-
-  @Test
-  public void testReadXMLNoBundleSize() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    List<Train> expectedResults =
-        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
-            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
-            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-
-  @Test
-  public void testReadXMLWithEmptyTags() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), 
trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, 
"blue", null),
-        new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", 
null),
-        new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", 
null),
-        new Train("Percy", 6, "green", null), new Train(), new Train());
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadXMLSmallPipeline() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    PCollection<Train> output =
-        p.apply(
-            "ReadFileData",
-            XmlIO.<Train>read()
-                .from(file.toPath().toString())
-                .withRootElement("trains")
-                .withRecordElement("train")
-                .withRecordClass(Train.class)
-                .withMinBundleSize(1024));
-
-    List<Train> expectedResults =
-        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
-            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
-            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
-
-    PAssert.that(output).containsInAnyOrder(expectedResults);
-    p.run();
-  }
-
-  @Test
-  public void testReadXMLWithAttributes() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), 
trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, 
"blue", "small"),
-        new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", 
"small"),
-        new Train("Gordon", 4, "blue", "big"), new Train("Emily", -1, "red", 
"small"),
-        new Train("Percy", 6, "green", "small"));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  public void testReadXMLWithWhitespaces() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), 
trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    List<Train> expectedResults = ImmutableList.of(new Train("Thomas   ", 1, 
"blue", null),
-        new Train("Henry", 3, "green", null), new Train("Toby", 7, "  brown  
", null),
-        new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", 
null),
-        new Train("Percy", 6, "green", null));
-
-    assertThat(
-        trainsToStrings(expectedResults),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  public void testReadXMLLarge() throws IOException {
-    String fileName = "temp.xml";
-    List<Train> trains = generateRandomTrainList(100);
-    File file = createRandomTrainXML(fileName, trains);
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024)
-            .createSource();
-
-    assertThat(
-        trainsToStrings(trains),
-        containsInAnyOrder(
-            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadXMLLargePipeline() throws IOException {
-    String fileName = "temp.xml";
-    List<Train> trains = generateRandomTrainList(100);
-    File file = createRandomTrainXML(fileName, trains);
-
-    PCollection<Train> output =
-        p.apply(
-            "ReadFileData",
-            XmlIO.<Train>read()
-                .from(file.toPath().toString())
-                .withRootElement("trains")
-                .withRecordElement("train")
-                .withRecordClass(Train.class)
-                .withMinBundleSize(1024));
-
-    PAssert.that(output).containsInAnyOrder(trains);
-    p.run();
-  }
-
-  @Test
-  public void testSplitWithEmptyBundles() throws Exception {
-    String fileName = "temp.xml";
-    List<Train> trains = generateRandomTrainList(10);
-    File file = createRandomTrainXML(fileName, trains);
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(10)
-            .createSource();
-    List<? extends BoundedSource<Train>> splits = source.split(100, null);
-
-    assertTrue(splits.size() > 2);
-
-    List<Train> results = new ArrayList<>();
-    for (BoundedSource<Train> split : splits) {
-      results.addAll(readEverythingFromReader(split.createReader(null)));
-    }
-
-    assertThat(trainsToStrings(trains), 
containsInAnyOrder(trainsToStrings(results).toArray()));
-  }
-
-  @Test
-  public void testXMLWithSplits() throws Exception {
-    String fileName = "temp.xml";
-    List<Train> trains = generateRandomTrainList(100);
-    File file = createRandomTrainXML(fileName, trains);
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(10)
-            .createSource();
-    List<? extends BoundedSource<Train>> splits = source.split(256, null);
-
-    // Not a trivial split
-    assertTrue(splits.size() > 2);
-
-    List<Train> results = new ArrayList<>();
-    for (BoundedSource<Train> split : splits) {
-      results.addAll(readEverythingFromReader(split.createReader(null)));
-    }
-    assertThat(trainsToStrings(trains), 
containsInAnyOrder(trainsToStrings(results).toArray()));
-  }
-
-  @Test
-  public void testSplitAtFraction() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    String fileName = "temp.xml";
-    List<Train> trains = generateRandomTrainList(100);
-    File file = createRandomTrainXML(fileName, trains);
-
-    BoundedSource<Train> fileSource =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(10)
-            .createSource();
-
-    List<? extends BoundedSource<Train>> splits =
-        fileSource.split(file.length() / 3, null);
-    for (BoundedSource<Train> splitSource : splits) {
-      int numItems = 
readEverythingFromReader(splitSource.createReader(null)).size();
-      // Should not split while unstarted.
-      assertSplitAtFractionFails(splitSource, 0, 0.7, options);
-      assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options);
-      assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, 
options);
-      assertSplitAtFractionFails(splitSource, 0, 0.0, options);
-      assertSplitAtFractionFails(splitSource, 20, 0.3, options);
-      assertSplitAtFractionFails(splitSource, numItems, 1.0, options);
-
-      // After reading 100 elements we will be approximately at position
-      // 0.99 * (endOffset - startOffset) hence trying to split at fraction 
0.9 will be
-      // unsuccessful.
-      assertSplitAtFractionFails(splitSource, numItems, 0.9, options);
-
-      // Following passes since we can always find a fraction that is 
extremely close to 1 such that
-      // the position suggested by the fraction will be larger than the 
position the reader is at
-      // after reading "items - 1" elements.
-      // This also passes for "numItemsToReadBeforeSplit = items" if the 
position at suggested
-      // fraction is larger than the position the reader is at after reading 
all "items" elements
-      // (i.e., the start position of the last element). This is true for most 
cases but will not
-      // be true if reader position is only one less than the end position. 
(i.e., the last element
-      // of the bundle start at the last byte that belongs to the bundle).
-      assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 
0.999, options);
-    }
-  }
-
-  @Test
-  public void testSplitAtFractionExhaustiveSingleByte() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), 
trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .createSource();
-    assertSplitAtFractionExhaustive(source, options);
-  }
-
-  @Test
-  @Ignore(
-      "Multi-byte characters in XML are not supported because the parser "
-      + "currently does not correctly report byte offsets")
-  public void testSplitAtFractionExhaustiveMultiByte() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), 
trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("දුම්රියන්")
-            .withRecordElement("දුම්රිය")
-            .withRecordClass(Train.class)
-            .createSource();
-    assertSplitAtFractionExhaustive(source, options);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadXMLFilePattern() throws IOException {
-    List<Train> trains1 = generateRandomTrainList(20);
-    File file = createRandomTrainXML("temp1.xml", trains1);
-    List<Train> trains2 = generateRandomTrainList(10);
-    createRandomTrainXML("temp2.xml", trains2);
-    List<Train> trains3 = generateRandomTrainList(15);
-    createRandomTrainXML("temp3.xml", trains3);
-    generateRandomTrainList(8);
-    createRandomTrainXML("otherfile.xml", trains1);
-
-    PCollection<Train> output =
-        p.apply(
-            "ReadFileData",
-            XmlIO.<Train>read()
-                .from(file.getParent() + "/" + "temp*.xml")
-                .withRootElement("trains")
-                .withRecordElement("train")
-                .withRecordClass(Train.class)
-                .withMinBundleSize(1024));
-
-    List<Train> expectedResults = new ArrayList<>();
-    expectedResults.addAll(trains1);
-    expectedResults.addAll(trains2);
-    expectedResults.addAll(trains3);
-
-    PAssert.that(output).containsInAnyOrder(expectedResults);
-    p.run();
-  }
-
-  @Test
-  public void testDisplayData() {
-    DisplayData displayData =
-        DisplayData.from(
-            XmlIO.<Integer>read()
-                .from("foo.xml")
-                .withRootElement("bird")
-                .withRecordElement("cat")
-                .withMinBundleSize(1234)
-                .withRecordClass(Integer.class));
-
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.xml"));
-    assertThat(displayData, hasDisplayItem("rootElement", "bird"));
-    assertThat(displayData, hasDisplayItem("recordElement", "cat"));
-    assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
-    assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 27fc614..5b1e243 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -77,6 +77,7 @@
     <module>kinesis</module>
     <module>mongodb</module>
     <module>mqtt</module>
+    <module>xml</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
new file mode 100644
index 0000000..49ce239
--- /dev/null
+++ b/sdks/java/io/xml/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-xml</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: XML</name>
+  <description>IO to read and write XML files.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>stax2-api</artifactId>
+      <version>${stax2.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>woodstox-core-asl</artifactId>
+      <version>${woodstox.version}</version>
+      <scope>runtime</scope>
+      <exclusions>
+        <!-- javax.xml.stream:stax-api is included in JDK 1.6+ -->
+        <exclusion>
+          <groupId>javax.xml.stream</groupId>
+          <artifactId>stax-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <version>1.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
new file mode 100644
index 0000000..b8b1b79
--- /dev/null
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.io.ByteStreams;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A coder for JAXB annotated objects. This coder uses JAXB 
marshalling/unmarshalling mechanisms
+ * to encode/decode the objects. Users must provide the {@code Class} of the 
JAXB annotated object.
+ *
+ * @param <T> type of JAXB annotated objects that will be serialized.
+ */
+public class JAXBCoder<T> extends AtomicCoder<T> {
+
+  private final Class<T> jaxbClass;
+  private final TypeDescriptor<T> typeDescriptor;
+  private transient volatile JAXBContext jaxbContext;
+  private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
+  private final EmptyOnDeserializationThreadLocal<Unmarshaller> 
jaxbUnmarshaller;
+
+  public Class<T> getJAXBClass() {
+    return jaxbClass;
+  }
+
+  private JAXBCoder(Class<T> jaxbClass) {
+    this.jaxbClass = jaxbClass;
+    this.typeDescriptor = TypeDescriptor.of(jaxbClass);
+    this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
+      @Override
+      protected Marshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createMarshaller();
+        } catch (JAXBException e) {
+          throw new RuntimeException("Error when creating marshaller from JAXB 
Context.", e);
+        }
+      }
+    };
+    this.jaxbUnmarshaller = new 
EmptyOnDeserializationThreadLocal<Unmarshaller>() {
+      @Override
+      protected Unmarshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createUnmarshaller();
+        } catch (Exception e) {
+          throw new RuntimeException("Error when creating unmarshaller from 
JAXB Context.", e);
+        }
+      }
+    };
+  }
+
+  /**
+   * Create a coder for a given type of JAXB annotated objects.
+   *
+   * @param jaxbClass the {@code Class} of the JAXB annotated objects.
+   */
+  public static <T> JAXBCoder<T> of(Class<T> jaxbClass) {
+    return new JAXBCoder<>(jaxbClass);
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    try {
+      if (!context.isWholeStream) {
+        try {
+          long size = getEncodedElementByteSize(value, Context.OUTER);
+          // record the number of bytes the XML consists of so when reading we 
only read the encoded
+          // value
+          VarInt.encode(size, outStream);
+        } catch (Exception e) {
+          throw new CoderException(
+              "An Exception occured while trying to get the size of an encoded 
representation", e);
+        }
+      }
+
+      jaxbMarshaller.get().marshal(value, new 
CloseIgnoringOutputStream(outStream));
+    } catch (JAXBException e) {
+      throw new CoderException(e);
+    }
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws 
CoderException, IOException {
+    try {
+      InputStream stream = inStream;
+      if (!context.isWholeStream) {
+        long limit = VarInt.decodeLong(inStream);
+        stream = ByteStreams.limit(inStream, limit);
+      }
+      @SuppressWarnings("unchecked")
+      T obj = (T) jaxbUnmarshaller.get().unmarshal(new 
CloseIgnoringInputStream(stream));
+      return obj;
+    } catch (JAXBException e) {
+      throw new CoderException(e);
+    }
+  }
+
+  private JAXBContext getContext() throws JAXBException {
+    if (jaxbContext == null) {
+      synchronized (this) {
+        if (jaxbContext == null) {
+          jaxbContext = JAXBContext.newInstance(jaxbClass);
+        }
+      }
+    }
+    return jaxbContext;
+  }
+
+  @Override
+  public String getEncodingId() {
+    return getJAXBClass().getName();
+  }
+
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return typeDescriptor;
+  }
+
+  private static class CloseIgnoringInputStream extends FilterInputStream {
+
+    protected CloseIgnoringInputStream(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public void close() {
+      // Do nothing. JAXB closes the underlying stream so we must filter out 
those calls.
+    }
+  }
+
+  private static class CloseIgnoringOutputStream extends FilterOutputStream {
+
+    protected CloseIgnoringOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // JAXB closes the underlying stream so we must filter out those calls.
+    }
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////
+  // JSON Serialization details below
+
+  private static final String JAXB_CLASS = "jaxb_class";
+
+  /**
+   * Constructor for JSON deserialization only.
+   */
+  @JsonCreator
+  public static <T> JAXBCoder<T> of(
+      @JsonProperty(JAXB_CLASS) String jaxbClassName) {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
+      return of(jaxbClass);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
+    Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
new file mode 100644
index 0000000..bf0e1b5
--- /dev/null
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CompressedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/** Transforms for reading and writing XML files using JAXB mappers. */
+public class XmlIO {
+  // CHECKSTYLE.OFF: JavadocStyle
+  /**
+   * Reads XML files. This source reads one or more XML files and creates a 
{@link PCollection} of a
+   * given type. Please note the example given below.
+   *
+   * <p>The XML file must be of the following form, where {@code root} and 
{@code record} are XML
+   * element names that are defined by the user:
+   *
+   * <pre>{@code
+   * <root>
+   * <record> ... </record>
+   * <record> ... </record>
+   * <record> ... </record>
+   * ...
+   * <record> ... </record>
+   * </root>
+   * }</pre>
+   *
+   * <p>Basically, the XML document should contain a single root element with 
an inner list
+   * consisting entirely of record elements. The records may contain arbitrary 
XML content; however,
+   * that content <b>must not</b> contain the start {@code <record>} or end 
{@code </record>} tags.
+   * This restriction enables reading from large XML files in parallel from 
different offsets in the
+   * file.
+   *
+   * <p>Root and/or record elements may additionally contain an arbitrary 
number of XML attributes.
+   * Additionally users must provide a class of a JAXB annotated Java type 
that can be used convert
+   * records into Java objects and vice versa using JAXB 
marshalling/unmarshalling mechanisms.
+   * Reading the source will generate a {@code PCollection} of the given JAXB 
annotated Java type.
+   * Optionally users may provide a minimum size of a bundle that should be 
created for the source.
+   *
+   * <p>The following example shows how to use this method in a Beam pipeline:
+   *
+   * <pre>{@code
+   * PCollection<String> output = p.apply(XmlIO.<Record>read()
+   *     .from(file.toPath().toString())
+   *     .withRootElement("root")
+   *     .withRecordElement("record")
+   *     .withRecordClass(Record.class));
+   * }</pre>
+   *
+   * <p>Currently, only XML files that use single-byte characters are 
supported. Using a file that
+   * contains multi-byte characters may result in data loss or duplication.
+   *
+   * <h3>Permissions</h3>
+   *
+   * <p>Permission requirements depend on the {@link 
org.apache.beam.sdk.runners.PipelineRunner
+   * PipelineRunner} that is used to execute the Beam pipeline. Please refer 
to the documentation of
+   * corresponding {@link PipelineRunner PipelineRunners} for more details.
+   *
+   * @param <T> Type of the objects that represent the records of the XML 
file. The {@code
+   *     PCollection} generated by this source will be of this type.
+   */
+  // CHECKSTYLE.ON: JavadocStyle
+  public static <T> Read<T> read() {
+    return new AutoValue_XmlIO_Read.Builder<T>()
+        .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
+        .setCompressionType(Read.CompressionType.AUTO)
+        .build();
+  }
+
+  // CHECKSTYLE.OFF: JavadocStyle
+  /**
+   * A {@link FileBasedSink} that outputs records as XML-formatted elements. 
Writes a {@link
+   * PCollection} of records from JAXB-annotated classes to a single file 
location.
+   *
+   * <p>Given a PCollection containing records of type T that can be 
marshalled to XML elements,
+   * this Sink will produce a single file consisting of a single root element 
that contains all of
+   * the elements in the PCollection.
+   *
+   * <p>XML Sinks are created with a base filename to write to, a root element 
name that will be
+   * used for the root element of the output files, and a class to bind to an 
XML element. This
+   * class will be used in the marshalling of records in an input PCollection 
to their XML
+   * representation and must be able to be bound using JAXB annotations 
(checked at pipeline
+   * construction time).
+   *
+   * <p>XML Sinks can be written to using the {@link Write} transform:
+   *
+   * <pre>{@code
+   * p.apply(XmlIO.<Type>write()
+   *      .withRecordClass(Type.class)
+   *      .withRootElement(root_element)
+   *      .toFilenamePrefix(output_filename));
+   * }</pre>
+   *
+   * <p>For example, consider the following class with JAXB annotations:
+   *
+   * <pre>
+   *  {@literal @}XmlRootElement(name = "word_count_result")
+   *  {@literal @}XmlType(propOrder = {"word", "frequency"})
+   *  public class WordFrequency {
+   *    private String word;
+   *    private long frequency;
+   *
+   *    public WordFrequency() { }
+   *
+   *    public WordFrequency(String word, long frequency) {
+   *      this.word = word;
+   *      this.frequency = frequency;
+   *    }
+   *
+   *    public void setWord(String word) {
+   *      this.word = word;
+   *    }
+   *
+   *    public void setFrequency(long frequency) {
+   *      this.frequency = frequency;
+   *    }
+   *
+   *    public long getFrequency() {
+   *      return frequency;
+   *    }
+   *
+   *    public String getWord() {
+   *      return word;
+   *    }
+   *  }
+   * </pre>
+   *
+   * <p>The following will produce XML output with a root element named 
"words" from a PCollection
+   * of WordFrequency objects:
+   *
+   * <pre>{@code
+   * p.apply(XmlIO.<WordFrequency>write()
+   *     .withRecordClass(WordFrequency.class)
+   *     .withRootElement("words")
+   *     .toFilenamePrefix(output_file));
+   * }</pre>
+   *
+   * <p>The output of which will look like:
+   *
+   * <pre>{@code
+   * <words>
+   *
+   *  <word_count_result>
+   *    <word>decreased</word>
+   *    <frequency>1</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>War</word>
+   *    <frequency>4</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>empress'</word>
+   *    <frequency>14</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>stoops</word>
+   *    <frequency>6</frequency>
+   *  </word_count_result>
+   *
+   *  ...
+   * </words>
+   * }</pre>
+   */
+  // CHECKSTYLE.ON: JavadocStyle
+  public static <T> Write<T> write() {
+    return new AutoValue_XmlIO_Write.Builder<T>().build();
+  }
+
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
+
+    @Nullable
+    abstract String getFileOrPatternSpec();
+
+    @Nullable
+    abstract String getRootElement();
+
+    @Nullable
+    abstract String getRecordElement();
+
+    @Nullable
+    abstract Class<T> getRecordClass();
+
+    abstract CompressionType getCompressionType();
+
+    abstract long getMinBundleSize();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
+
+      abstract Builder<T> setRootElement(String rootElement);
+
+      abstract Builder<T> setRecordElement(String recordElement);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setMinBundleSize(long minBundleSize);
+
+      abstract Builder<T> setCompressionType(CompressionType compressionType);
+
+      abstract Read<T> build();
+    }
+
+    /** Strategy for determining the compression type of XML files being read. 
*/
+    public enum CompressionType {
+      /** Automatically determine the compression type based on filename 
extension. */
+      AUTO(""),
+      /** Uncompressed (i.e., may be split). */
+      UNCOMPRESSED(""),
+      /** GZipped. */
+      GZIP(".gz"),
+      /** BZipped. */
+      BZIP2(".bz2"),
+      /** Zipped. */
+      ZIP(".zip"),
+      /** Deflate compressed. */
+      DEFLATE(".deflate");
+
+      private String filenameSuffix;
+
+      CompressionType(String suffix) {
+        this.filenameSuffix = suffix;
+      }
+
+      /**
+       * Determine if a given filename matches a compression type based on its 
extension.
+       *
+       * @param filename the filename to match
+       * @return true iff the filename ends with the compression type's known 
extension.
+       */
+      public boolean matches(String filename) {
+        return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+      }
+    }
+
+    /**
+     * Reads a single XML file or a set of XML files defined by a Java "glob" 
file pattern. Each XML
+     * file should be of the form defined in {@link #read}.
+     */
+    public Read<T> from(String fileOrPatternSpec) {
+      return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
+    }
+
+    /**
+     * Sets name of the root element of the XML document. This will be used to 
create a valid
+     * starting root element when initiating a bundle of records created from 
an XML document. This
+     * is a required parameter.
+     */
+    public Read<T> withRootElement(String rootElement) {
+      return toBuilder().setRootElement(rootElement).build();
+    }
+
+    /**
+     * Sets name of the record element of the XML document. This will be used 
to determine offset of
+     * the first record of a bundle created from the XML document. This is a 
required parameter.
+     */
+    public Read<T> withRecordElement(String recordElement) {
+      return toBuilder().setRecordElement(recordElement).build();
+    }
+
+    /**
+     * Sets a JAXB annotated class that can be populated using a record of the 
provided XML file.
+     * This will be used when unmarshalling record objects from the XML file. 
This is a required
+     * parameter.
+     */
+    public Read<T> withRecordClass(Class<T> recordClass) {
+      return toBuilder().setRecordClass(recordClass).build();
+    }
+
+    /**
+     * Sets a parameter {@code minBundleSize} for the minimum bundle size of 
the source. Please
+     * refer to {@link OffsetBasedSource} for the definition of minBundleSize. 
This is an optional
+     * parameter.
+     */
+    public Read<T> withMinBundleSize(long minBundleSize) {
+      return toBuilder().setMinBundleSize(minBundleSize).build();
+    }
+
+    /**
+     * Decompresses all input files using the specified compression type.
+     *
+     * <p>If no compression type is specified, the default is {@link 
CompressionType#AUTO}. In this
+     * mode, the compression type of the file is determined by its extension. 
Supports .gz, .bz2,
+     * .zip and .deflate compression.
+     */
+    public Read<T> withCompressionType(CompressionType compressionType) {
+      return toBuilder().setCompressionType(compressionType).build();
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(
+          getRootElement(),
+          "rootElement is null. Use builder method withRootElement() to set 
this.");
+      checkNotNull(
+          getRecordElement(),
+          "recordElement is null. Use builder method withRecordElement() to 
set this.");
+      checkNotNull(
+          getRecordClass(),
+          "recordClass is null. Use builder method withRecordClass() to set 
this.");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .addIfNotDefault(
+              DisplayData.item("minBundleSize", getMinBundleSize())
+                  .withLabel("Minimum Bundle Size"),
+              1L)
+          .add(DisplayData.item("filePattern", 
getFileOrPatternSpec()).withLabel("File Pattern"))
+          .addIfNotNull(
+              DisplayData.item("rootElement", getRootElement()).withLabel("XML 
Root Element"))
+          .addIfNotNull(
+              DisplayData.item("recordElement", 
getRecordElement()).withLabel("XML Record Element"))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"));
+    }
+
+    @VisibleForTesting
+    BoundedSource<T> createSource() {
+      XmlSource<T> source = new XmlSource<>(this);
+      switch (getCompressionType()) {
+        case UNCOMPRESSED:
+          return source;
+        case AUTO:
+          return CompressedSource.from(source);
+        case BZIP2:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.BZIP2);
+        case GZIP:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.GZIP);
+        case ZIP:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.ZIP);
+        case DEFLATE:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+        default:
+          throw new IllegalArgumentException("Unknown compression type: " + 
getCompressionType());
+      }
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(createSource()));
+    }
+  }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    @Nullable
+    abstract String getFilenamePrefix();
+
+    @Nullable
+    abstract Class<T> getRecordClass();
+
+    @Nullable
+    abstract String getRootElement();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setRootElement(String rootElement);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Writes to files with the given path prefix.
+     *
+     * <p>Output files will have the name {@literal 
{filenamePrefix}-0000i-of-0000n.xml} where n is
+     * the number of output bundles.
+     */
+    public Write<T> toFilenamePrefix(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /**
+     * Writes objects of the given class mapped to XML elements using JAXB.
+     *
+     * <p>The specified class must be able to be used to create a JAXB context.
+     */
+    public Write<T> withRecordClass(Class<T> recordClass) {
+      return toBuilder().setRecordClass(recordClass).build();
+    }
+
+    /** Sets the enclosing root element for the generated XML files. */
+    public Write<T> withRootElement(String rootElement) {
+      return toBuilder().setRootElement(rootElement).build();
+    }
+
+    @Override
+    public void validate(PCollection<T> input) {
+      checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB 
context.");
+      checkNotNull(getRootElement(), "Missing a root element name.");
+      checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");
+      try {
+        JAXBContext.newInstance(getRecordClass());
+      } catch (JAXBException e) {
+        throw new RuntimeException("Error binding classes to a JAXB Context.", 
e);
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
+    }
+
+    @VisibleForTesting
+    XmlSink<T> createSink() {
+      return new XmlSink<>(this);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      createSink().populateFileBasedDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("rootElement", getRootElement()).withLabel("XML 
Root Element"))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
new file mode 100644
index 0000000..2e7dba1
--- /dev/null
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Marshaller;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+
+/** Implementation of {@link XmlIO#write}. */
+class XmlSink<T> extends FileBasedSink<T> {
+  protected static final String XML_EXTENSION = "xml";
+
+  private final XmlIO.Write<T> spec;
+
+  XmlSink(XmlIO.Write<T> spec) {
+    super(spec.getFilenamePrefix(), XML_EXTENSION);
+    this.spec = spec;
+  }
+
+  /**
+   * Validates that the root element, class to bind to a JAXB context, and 
filenamePrefix have
+   * been set and that the class can be bound in a JAXB context.
+   */
+  @Override
+  public void validate(PipelineOptions options) {
+    spec.validate(null);
+  }
+
+  /**
+   * Creates an {@link XmlWriteOperation}.
+   */
+  @Override
+  public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+    return new XmlWriteOperation<>(this);
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    spec.populateDisplayData(builder);
+  }
+
+  void populateFileBasedDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+  }
+
+  /**
+   * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link 
FileBasedSink}s.
+   */
+  protected static final class XmlWriteOperation<T> extends 
FileBasedWriteOperation<T> {
+    public XmlWriteOperation(XmlSink<T> sink) {
+      super(sink);
+    }
+
+    /**
+     * Creates a {@link XmlWriter} with a marshaller for the type it will 
write.
+     */
+    @Override
+    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception 
{
+      JAXBContext context;
+      Marshaller marshaller;
+      context = JAXBContext.newInstance(getSink().spec.getRecordClass());
+      marshaller = context.createMarshaller();
+      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
+      marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
+      return new XmlWriter<>(this, marshaller);
+    }
+
+    /**
+     * Return the XmlSink.Bound for this write operation.
+     */
+    @Override
+    public XmlSink<T> getSink() {
+      return (XmlSink<T>) super.getSink();
+    }
+
+    @VisibleForTesting
+    String getTemporaryDirectory() {
+      return this.tempDirectory.get();
+    }
+  }
+
+  /**
+   * A {@link FileBasedWriter} that can write objects as XML elements.
+   */
+  protected static final class XmlWriter<T> extends FileBasedWriter<T> {
+    final Marshaller marshaller;
+    private OutputStream os = null;
+
+    public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller 
marshaller) {
+      super(writeOperation);
+      this.marshaller = marshaller;
+    }
+
+    /**
+     * Creates the output stream that elements will be written to.
+     */
+    @Override
+    protected void prepareWrite(WritableByteChannel channel) throws Exception {
+      os = Channels.newOutputStream(channel);
+    }
+
+    /**
+     * Writes the root element opening tag.
+     */
+    @Override
+    protected void writeHeader() throws Exception {
+      String rootElementName = 
getWriteOperation().getSink().spec.getRootElement();
+      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + 
rootElementName + ">\n"));
+    }
+
+    /**
+     * Writes the root element closing tag.
+     */
+    @Override
+    protected void writeFooter() throws Exception {
+      String rootElementName = 
getWriteOperation().getSink().spec.getRootElement();
+      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + 
rootElementName + ">"));
+    }
+
+    /**
+     * Writes a value to the stream.
+     */
+    @Override
+    public void write(T value) throws Exception {
+      marshaller.marshal(value, os);
+    }
+
+    /**
+     * Return the XmlWriteOperation this write belongs to.
+     */
+    @Override
+    public XmlWriteOperation<T> getWriteOperation() {
+      return (XmlWriteOperation<T>) super.getWriteOperation();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
new file mode 100644
index 0000000..876c782
--- /dev/null
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.NoSuchElementException;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.ValidationEvent;
+import javax.xml.bind.ValidationEventHandler;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.codehaus.stax2.XMLInputFactory2;
+
+/** Implementation of {@link XmlIO#read}. */
+public class XmlSource<T> extends FileBasedSource<T> {
+
+  private static final String XML_VERSION = "1.1";
+
+  private final XmlIO.Read<T> spec;
+
+  XmlSource(XmlIO.Read<T> spec) {
+    super(StaticValueProvider.of(spec.getFileOrPatternSpec()), 
spec.getMinBundleSize());
+    this.spec = spec;
+  }
+
+  private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, 
long endOffset) {
+    super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
+    this.spec = spec;
+  }
+
+  @Override
+  protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long 
start, long end) {
+    return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, 
end);
+  }
+
+  @Override
+  protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) 
{
+    return new XMLReader<T>(this);
+  }
+
+  @Override
+  public void validate() {
+    super.validate();
+    spec.validate(null);
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    spec.populateDisplayData(builder);
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return JAXBCoder.of(spec.getRecordClass());
+  }
+
+  /**
+   * A {@link Source.Reader} for reading JAXB annotated Java objects from an 
XML file. The XML
+   * file should be of the form defined at {@link XmlSource}.
+   *
+   * <p>Timestamped values are currently unsupported - all values implicitly 
have the timestamp
+   * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
+   *
+   * @param <T> Type of objects that will be read by the reader.
+   */
+  private static class XMLReader<T> extends FileBasedReader<T> {
+    // The amount of bytes read from the channel to memory when determining 
the starting offset of
+    // the first record in a bundle. After matching to starting offset of the 
first record the
+    // remaining bytes read to this buffer and the bytes still not read from 
the channel are used to
+    // create the XML parser.
+    private static final int BUF_SIZE = 1024;
+
+    // This should be the maximum number of bytes a character will encode to, 
for any encoding
+    // supported by XmlSource. Currently this is set to 4 since UTF-8 
characters may be
+    // four bytes.
+    private static final int MAX_CHAR_BYTES = 4;
+
+    // In order to support reading starting in the middle of an XML file, we 
construct an imaginary
+    // well-formed document (a header and root tag followed by the contents of 
the input starting at
+    // the record boundary) and feed it to the parser. Because of this, the 
offset reported by the
+    // XML parser is not the same as offset in the original file. They differ 
by a constant amount:
+    // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + 
parserBaseOffset;
+    // Note that this is true only for files with single-byte characters.
+    // It appears that, as of writing, there does not exist a Java XML parser 
capable of correctly
+    // reporting byte offsets of elements in the presence of multi-byte 
characters.
+    private long parserBaseOffset = 0;
+    private boolean readingStarted = false;
+
+    // If true, the current bundle does not contain any records.
+    private boolean emptyBundle = false;
+
+    private Unmarshaller jaxbUnmarshaller = null;
+    private XMLStreamReader parser = null;
+
+    private T currentRecord = null;
+
+    // Byte offset of the current record in the XML file provided when 
creating the source.
+    private long currentByteOffset = 0;
+
+    public XMLReader(XmlSource<T> source) {
+      super(source);
+
+      // Set up a JAXB Unmarshaller that can be used to unmarshall record 
objects.
+      try {
+        JAXBContext jaxbContext = 
JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
+        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
+
+        // Throw errors if validation fails. JAXB by default ignores 
validation errors.
+        jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
+          @Override
+          public boolean handleEvent(ValidationEvent event) {
+            throw new RuntimeException(event.getMessage(), 
event.getLinkedException());
+          }
+        });
+      } catch (JAXBException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public synchronized XmlSource<T> getCurrentSource() {
+      return (XmlSource<T>) super.getCurrentSource();
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      // This method determines the correct starting offset of the first 
record by reading bytes
+      // from the ReadableByteChannel. This implementation does not need the 
channel to be a
+      // SeekableByteChannel.
+      // The method tries to determine the first record element in the byte 
channel. The first
+      // record must start with the characters "<recordElement" where 
"recordElement" is the
+      // record element of the XML document described above. For the match to 
be complete this
+      // has to be followed by one of following.
+      // * any whitespace character
+      // * '>' character
+      // * '/' character (to support empty records).
+      //
+      // After this match this method creates the XML parser for parsing the 
XML document,
+      // feeding it a fake document consisting of an XML header and the 
<rootElement> tag followed
+      // by the contents of channel starting from <recordElement. The 
<rootElement> tag may be never
+      // closed.
+
+      // This stores any bytes that should be used prior to the remaining 
bytes of the channel when
+      // creating an XML parser object.
+      ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
+      // A dummy declaration and root for the document with proper XML version 
and encoding. Without
+      // this XML parsing may fail or may produce incorrect results.
+
+      byte[] dummyStartDocumentBytes =
+          (String.format(
+                  "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
+                  XML_VERSION, getCurrentSource().spec.getRootElement()))
+              .getBytes(StandardCharsets.UTF_8);
+      preambleByteBuffer.write(dummyStartDocumentBytes);
+      // Gets the byte offset (in the input file) of the first record in 
ReadableByteChannel. This
+      // method returns the offset and stores any bytes that should be used 
when creating the XML
+      // parser in preambleByteBuffer.
+      long offsetInFileOfRecordElement =
+          getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
+      if (offsetInFileOfRecordElement < 0) {
+        // Bundle has no records. So marking this bundle as an empty bundle.
+        emptyBundle = true;
+        return;
+      } else {
+        byte[] preambleBytes = preambleByteBuffer.toByteArray();
+        currentByteOffset = offsetInFileOfRecordElement;
+        setUpXMLParser(channel, preambleBytes);
+        parserBaseOffset = offsetInFileOfRecordElement - 
dummyStartDocumentBytes.length;
+      }
+      readingStarted = true;
+    }
+
+    // Gets the first occurrence of the next record within the given 
ReadableByteChannel. Puts
+    // any bytes read past the starting offset of the next record back to the 
preambleByteBuffer.
+    // If a record is found, returns the starting offset of the record, 
otherwise
+    // returns -1.
+    private long getFirstOccurenceOfRecordElement(
+        ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) 
throws IOException {
+      int byteIndexInRecordElementToMatch = 0;
+      // Index of the byte in the string "<recordElement" to be matched
+      // against the current byte from the stream.
+      boolean recordStartBytesMatched = false; // "<recordElement" matched. 
Still have to match the
+      // next character to confirm if this is a positive match.
+      boolean fullyMatched = false; // If true, record element was fully 
matched.
+
+      // This gives the offset of the byte currently being read. We do a '-1' 
here since we
+      // increment this value at the beginning of the while loop below.
+      long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
+      long startingOffsetInFileOfCurrentMatch = -1;
+      // If this is non-negative, currently there is a match in progress and 
this value gives the
+      // starting offset of the match currently being conducted.
+      boolean matchStarted = false; // If true, a match is currently in 
progress.
+
+      // These two values are used to determine the character immediately 
following a match for
+      // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
+      byte[] charBytes = new byte[MAX_CHAR_BYTES];
+      int charBytesFound = 0;
+
+      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
+      byte[] recordStartBytes =
+          ("<" + 
getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8);
+
+      outer: while (channel.read(buf) > 0) {
+        buf.flip();
+        while (buf.hasRemaining()) {
+          offsetInFileOfCurrentByte++;
+          byte b = buf.get();
+          boolean reset = false;
+          if (recordStartBytesMatched) {
+            // We already matched "<recordElement" reading the next character 
to determine if this
+            // is a positive match for a new record.
+            charBytes[charBytesFound] = b;
+            charBytesFound++;
+            Character c = null;
+            if (charBytesFound == charBytes.length) {
+              CharBuffer charBuf = CharBuffer.allocate(1);
+              InputStream charBufStream = new ByteArrayInputStream(charBytes);
+              java.io.Reader reader =
+                  new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
+              int read = reader.read();
+              if (read <= 0) {
+                return -1;
+              }
+              charBuf.flip();
+              c = (char) read;
+            } else {
+              continue;
+            }
+
+            // Record start may be of following forms
+            // * "<recordElement<whitespace>..."
+            // * "<recordElement>..."
+            // * "<recordElement/..."
+            if (Character.isWhitespace(c) || c == '>' || c == '/') {
+              fullyMatched = true;
+              // Add the recordStartBytes and charBytes to preambleByteBuffer 
since these were
+              // already read from the channel.
+              preambleByteBuffer.write(recordStartBytes);
+              preambleByteBuffer.write(charBytes);
+              // Also add the rest of the current buffer to preambleByteBuffer.
+              while (buf.hasRemaining()) {
+                preambleByteBuffer.write(buf.get());
+              }
+              break outer;
+            } else {
+              // Matching was unsuccessful. Reset the buffer to include bytes 
read for the char.
+              ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
+              newbuf.put(charBytes);
+              offsetInFileOfCurrentByte -= charBytes.length;
+              while (buf.hasRemaining()) {
+                newbuf.put(buf.get());
+              }
+              newbuf.flip();
+              buf = newbuf;
+
+              // Ignore everything and try again starting from the current 
buffer.
+              reset = true;
+            }
+          } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
+            // Next byte matched.
+            if (!matchStarted) {
+              // Match was for the first byte, record the starting offset.
+              matchStarted = true;
+              startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
+            }
+            byteIndexInRecordElementToMatch++;
+          } else {
+            // Not a match. Ignore everything and try again starting at 
current point.
+            reset = true;
+          }
+          if (reset) {
+            // Clear variables and try to match starting from the next byte.
+            byteIndexInRecordElementToMatch = 0;
+            startingOffsetInFileOfCurrentMatch = -1;
+            matchStarted = false;
+            recordStartBytesMatched = false;
+            charBytes = new byte[MAX_CHAR_BYTES];
+            charBytesFound = 0;
+          }
+          if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
+            // "<recordElement" matched. Need to still check next byte since 
this might be an
+            // element that has "recordElement" as a prefix.
+            recordStartBytesMatched = true;
+          }
+        }
+        buf.clear();
+      }
+
+      if (!fullyMatched) {
+        return -1;
+      } else {
+        return startingOffsetInFileOfCurrentMatch;
+      }
+    }
+
+    private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) 
throws IOException {
+      try {
+        // We use Woodstox because the StAX implementation provided by OpenJDK 
reports
+        // character locations incorrectly. Note that Woodstox still currently 
reports *byte*
+        // locations incorrectly when parsing documents that contain 
multi-byte characters.
+        XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) 
XMLInputFactory.newInstance();
+        this.parser = xmlInputFactory.createXMLStreamReader(
+            new SequenceInputStream(
+                new ByteArrayInputStream(lookAhead), 
Channels.newInputStream(channel)),
+            "UTF-8");
+
+        // Current offset should be the offset before reading the record 
element.
+        while (true) {
+          int event = parser.next();
+          if (event == XMLStreamConstants.START_ELEMENT) {
+            String localName = parser.getLocalName();
+            if (localName.equals(getCurrentSource().spec.getRecordElement())) {
+              break;
+            }
+          }
+        }
+      } catch (FactoryConfigurationError | XMLStreamException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      if (emptyBundle) {
+        currentByteOffset = Long.MAX_VALUE;
+        return false;
+      }
+      try {
+        // Update current offset and check if the next value is the record 
element.
+        currentByteOffset = parserBaseOffset + 
parser.getLocation().getCharacterOffset();
+        while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
+          parser.next();
+          currentByteOffset = parserBaseOffset + 
parser.getLocation().getCharacterOffset();
+          if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
+            currentByteOffset = Long.MAX_VALUE;
+            return false;
+          }
+        }
+        JAXBElement<T> jb =
+            jaxbUnmarshaller.unmarshal(parser, 
getCurrentSource().spec.getRecordClass());
+        currentRecord = jb.getValue();
+        return true;
+      } catch (JAXBException | XMLStreamException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (!readingStarted) {
+        throw new NoSuchElementException();
+      }
+      return currentRecord;
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() {
+      // Every record is at a split point.
+      return true;
+    }
+
+    @Override
+    protected long getCurrentOffset() {
+      return currentByteOffset;
+    }
+  }
+}

Reply via email to