http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java deleted file mode 100644 index 0120b8b..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ /dev/null @@ -1,892 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; -import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; -import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.hamcrest.Matchers.both; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.ImmutableList; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import javax.xml.bind.annotation.XmlAttribute; -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.beam.sdk.io.Source.Reader; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.Matchers; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests XmlSource. - */ -@RunWith(JUnit4.class) -public class XmlSourceTest { - - @Rule - public TestPipeline p = TestPipeline.create(); - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Rule - public ExpectedException exception = ExpectedException.none(); - - String tinyXML = - "<trains><train><name>Thomas</name></train><train><name>Henry</name></train>" - + "<train><name>James</name></train></trains>"; - - String xmlWithMultiByteElementName = - "<දà·à¶¸à·à¶»à·à¶ºà¶±à·><දà·à¶¸à·à¶»à·à¶º><name>Thomas</name></දà·à¶¸à·à¶»à·à¶º><දà·à¶¸à·à¶»à·à¶º><name>Henry</name></දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º><name>James</name></දà·à¶¸à·à¶»à·à¶º></දà·à¶¸à·à¶»à·à¶ºà¶±à·>"; - - String xmlWithMultiByteChars = - "<trains><train><name>ThomasÂ¥</name></train><train><name>Hen¶ry</name></train>" - + "<train><name>JamÃes</name></train></trains>"; - - String trainXML = - "<trains>" - + "<train><name>Thomas</name><number>1</number><color>blue</color></train>" - + "<train><name>Henry</name><number>3</number><color>green</color></train>" - + "<train><name>Toby</name><number>7</number><color>brown</color></train>" - + "<train><name>Gordon</name><number>4</number><color>blue</color></train>" - + "<train><name>Emily</name><number>-1</number><color>red</color></train>" - + "<train><name>Percy</name><number>6</number><color>green</color></train>" - + "</trains>"; - - String trainXMLWithEmptyTags = - "<trains>" - + "<train/>" - + "<train><name>Thomas</name><number>1</number><color>blue</color></train>" - + "<train><name>Henry</name><number>3</number><color>green</color></train>" - + "<train/>" - + "<train><name>Toby</name><number>7</number><color>brown</color></train>" - + "<train><name>Gordon</name><number>4</number><color>blue</color></train>" - + "<train><name>Emily</name><number>-1</number><color>red</color></train>" - + "<train><name>Percy</name><number>6</number><color>green</color></train>" - + "</trains>"; - - String trainXMLWithAttributes = - "<trains>" - + "<train size=\"small\"><name>Thomas</name><number>1</number><color>blue</color></train>" - + "<train size=\"big\"><name>Henry</name><number>3</number><color>green</color></train>" - + "<train size=\"small\"><name>Toby</name><number>7</number><color>brown</color></train>" - + "<train size=\"big\"><name>Gordon</name><number>4</number><color>blue</color></train>" - + "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>" - + "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color></train>" - + "</trains>"; - - String trainXMLWithSpaces = - "<trains>" - + "<train><name>Thomas </name> <number>1</number><color>blue</color></train>" - + "<train><name>Henry</name><number>3</number><color>green</color></train>\n" - + "<train><name>Toby</name><number>7</number><color> brown </color></train> " - + "<train><name>Gordon</name> <number>4</number><color>blue</color>\n</train>\t" - + "<train><name>Emily</name><number>-1</number>\t<color>red</color></train>" - + "<train>\n<name>Percy</name> <number>6 </number> <color>green</color></train>" - + "</trains>"; - - String trainXMLWithAllFeaturesMultiByte = - "<දà·à¶¸à·à¶»à·à¶ºà¶±à·>" - + "<දà·à¶¸à·à¶»à·à¶º/>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name> ThomasÂ¥</name><number>1</number><color>blue</color>" - + "</දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"big\"><name>He nry</name><number>3</number><color>green</color></දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Toby </name><number>7</number><color>br¶own</color>" - + "</දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º/>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Emily</name><number>-1</number><color>red</color></දà·à¶¸à·à¶»à·à¶º>" - + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Percy</name><number>6</number><color>green</color>" - + "</දà·à¶¸à·à¶»à·à¶º>" - + "</දà·à¶¸à·à¶»à·à¶ºà¶±à·>"; - - String trainXMLWithAllFeaturesSingleByte = - "<trains>" - + "<train/>" - + "<train size=\"small\"><name> Thomas</name><number>1</number><color>blue</color>" - + "</train>" - + "<train size=\"big\"><name>He nry</name><number>3</number><color>green</color></train>" - + "<train size=\"small\"><name>Toby </name><number>7</number><color>brown</color>" - + "</train>" - + "<train/>" - + "<train size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></train>" - + "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>" - + "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color>" - + "</train>" - + "</trains>"; - - @XmlRootElement - static class Train { - public static final int TRAIN_NUMBER_UNDEFINED = -1; - public String name = null; - public String color = null; - public int number = TRAIN_NUMBER_UNDEFINED; - - @XmlAttribute(name = "size") - public String size = null; - - public Train() {} - - public Train(String name, int number, String color, String size) { - this.name = name; - this.number = number; - this.color = color; - this.size = size; - } - - @Override - public int hashCode() { - int hashCode = 1; - hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode()); - hashCode = 31 * hashCode + number; - hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode()); - hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode()); - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof Train)) { - return false; - } - - Train other = (Train) obj; - return (name == null || name.equals(other.name)) && (number == other.number) - && (color == null || color.equals(other.color)) - && (size == null || size.equals(other.size)); - } - - @Override - public String toString() { - String str = "Train["; - boolean first = true; - if (name != null) { - str = str + "name=" + name; - first = false; - } - if (number != Integer.MIN_VALUE) { - if (!first) { - str = str + ","; - } - str = str + "number=" + number; - first = false; - } - if (color != null) { - if (!first) { - str = str + ","; - } - str = str + "color=" + color; - first = false; - } - if (size != null) { - if (!first) { - str = str + ","; - } - str = str + "size=" + size; - } - str = str + "]"; - return str; - } - } - - private List<Train> generateRandomTrainList(int size) { - String[] names = {"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", "Mavis", "Edward", - "Bertie", "Harold", "Hiro", "Terence", "Salty", "Trevor"}; - int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - String[] colors = {"red", "blue", "green", "orange", "brown", "black", "white"}; - String[] sizes = {"small", "medium", "big"}; - - Random random = new Random(System.currentTimeMillis()); - - List<Train> trains = new ArrayList<>(); - for (int i = 0; i < size; i++) { - trains.add(new Train(names[random.nextInt(names.length - 1)], - numbers[random.nextInt(numbers.length - 1)], colors[random.nextInt(colors.length - 1)], - sizes[random.nextInt(sizes.length - 1)])); - } - - return trains; - } - - private String trainToXMLElement(Train train) { - return "<train size=\"" + train.size + "\"><name>" + train.name + "</name><number>" - + train.number + "</number><color>" + train.color + "</color></train>"; - } - - private File createRandomTrainXML(String fileName, List<Train> trains) throws IOException { - File file = tempFolder.newFile(fileName); - try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { - writer.write("<trains>"); - writer.newLine(); - for (Train train : trains) { - String str = trainToXMLElement(train); - writer.write(str); - writer.newLine(); - } - writer.write("</trains>"); - writer.newLine(); - } - return file; - } - - private List<Train> readEverythingFromReader(Reader<Train> reader) throws IOException { - List<Train> results = new ArrayList<>(); - for (boolean available = reader.start(); available; available = reader.advance()) { - Train train = reader.getCurrent(); - results.add(train); - } - return results; - } - - @Test - public void testReadXMLTiny() throws IOException { - File file = tempFolder.newFile("trainXMLTiny"); - Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of( - new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - public void testReadXMLWithMultiByteChars() throws IOException { - File file = tempFolder.newFile("trainXMLTiny"); - Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of( - new Train("ThomasÂ¥", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("JamÃes", Train.TRAIN_NUMBER_UNDEFINED, null, null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - @Ignore( - "Multi-byte characters in XML are not supported because the parser " - + "currently does not correctly report byte offsets") - public void testReadXMLWithMultiByteElementName() throws IOException { - File file = tempFolder.newFile("trainXMLTiny"); - Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") - .withRecordElement("දà·à¶¸à·à¶»à·à¶º") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of( - new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - public void testSplitWithEmptyBundleAtEnd() throws Exception { - File file = tempFolder.newFile("trainXMLTiny"); - Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(10) - .createSource(); - List<? extends BoundedSource<Train>> splits = source.split(50, null); - - assertTrue(splits.size() > 2); - - List<Train> results = new ArrayList<>(); - for (BoundedSource<Train> split : splits) { - results.addAll(readEverythingFromReader(split.createReader(null))); - } - - List<Train> expectedResults = ImmutableList.of( - new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), - new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); - - assertThat( - trainsToStrings(expectedResults), containsInAnyOrder(trainsToStrings(results).toArray())); - } - - List<String> trainsToStrings(List<Train> input) { - List<String> strings = new ArrayList<>(); - for (Object data : input) { - strings.add(data.toString()); - } - return strings; - } - - @Test - public void testReadXMLSmall() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = - ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), - new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), - new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - public void testReadXMLNoRootElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRecordElement("train") - .withRecordClass(Train.class) - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "rootElement is null. Use builder method withRootElement() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLNoRecordElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordClass(Train.class) - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "recordElement is null. Use builder method withRecordElement() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLNoRecordClass() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "recordClass is null. Use builder method withRecordClass() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLIncorrectRootElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("something") - .withRecordElement("train") - .withRecordClass(Train.class) - .createSource(); - - exception.expectMessage("Unexpected close tag </trains>; expected </something>."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLIncorrectRecordElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("something") - .withRecordClass(Train.class) - .createSource(); - - assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>()); - } - - @XmlRootElement - private static class WrongTrainType { - @SuppressWarnings("unused") - public String something; - } - - @Test - public void testReadXMLInvalidRecordClass() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<WrongTrainType> source = - XmlIO.<WrongTrainType>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(WrongTrainType.class) - .createSource(); - - exception.expect(RuntimeException.class); - - // JAXB internationalizes the error message. So this is all we can match for. - exception.expectMessage(both(containsString("name")).and(Matchers.containsString("something"))); - try (Reader<WrongTrainType> reader = source.createReader(null)) { - - List<WrongTrainType> results = new ArrayList<>(); - for (boolean available = reader.start(); available; available = reader.advance()) { - WrongTrainType train = reader.getCurrent(); - results.add(train); - } - } - } - - @Test - public void testReadXMLNoBundleSize() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .createSource(); - - List<Train> expectedResults = - ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), - new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), - new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - - @Test - public void testReadXMLWithEmptyTags() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), - new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null), - new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), - new Train("Percy", 6, "green", null), new Train(), new Train()); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - @Category(NeedsRunner.class) - public void testReadXMLSmallPipeline() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - PCollection<Train> output = - p.apply( - "ReadFileData", - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024)); - - List<Train> expectedResults = - ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), - new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), - new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); - - PAssert.that(output).containsInAnyOrder(expectedResults); - p.run(); - } - - @Test - public void testReadXMLWithAttributes() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"), - new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"), - new Train("Gordon", 4, "blue", "big"), new Train("Emily", -1, "red", "small"), - new Train("Percy", 6, "green", "small")); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - public void testReadXMLWithWhitespaces() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - List<Train> expectedResults = ImmutableList.of(new Train("Thomas ", 1, "blue", null), - new Train("Henry", 3, "green", null), new Train("Toby", 7, " brown ", null), - new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), - new Train("Percy", 6, "green", null)); - - assertThat( - trainsToStrings(expectedResults), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - public void testReadXMLLarge() throws IOException { - String fileName = "temp.xml"; - List<Train> trains = generateRandomTrainList(100); - File file = createRandomTrainXML(fileName, trains); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024) - .createSource(); - - assertThat( - trainsToStrings(trains), - containsInAnyOrder( - trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); - } - - @Test - @Category(NeedsRunner.class) - public void testReadXMLLargePipeline() throws IOException { - String fileName = "temp.xml"; - List<Train> trains = generateRandomTrainList(100); - File file = createRandomTrainXML(fileName, trains); - - PCollection<Train> output = - p.apply( - "ReadFileData", - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024)); - - PAssert.that(output).containsInAnyOrder(trains); - p.run(); - } - - @Test - public void testSplitWithEmptyBundles() throws Exception { - String fileName = "temp.xml"; - List<Train> trains = generateRandomTrainList(10); - File file = createRandomTrainXML(fileName, trains); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(10) - .createSource(); - List<? extends BoundedSource<Train>> splits = source.split(100, null); - - assertTrue(splits.size() > 2); - - List<Train> results = new ArrayList<>(); - for (BoundedSource<Train> split : splits) { - results.addAll(readEverythingFromReader(split.createReader(null))); - } - - assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); - } - - @Test - public void testXMLWithSplits() throws Exception { - String fileName = "temp.xml"; - List<Train> trains = generateRandomTrainList(100); - File file = createRandomTrainXML(fileName, trains); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(10) - .createSource(); - List<? extends BoundedSource<Train>> splits = source.split(256, null); - - // Not a trivial split - assertTrue(splits.size() > 2); - - List<Train> results = new ArrayList<>(); - for (BoundedSource<Train> split : splits) { - results.addAll(readEverythingFromReader(split.createReader(null))); - } - assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); - } - - @Test - public void testSplitAtFraction() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - String fileName = "temp.xml"; - List<Train> trains = generateRandomTrainList(100); - File file = createRandomTrainXML(fileName, trains); - - BoundedSource<Train> fileSource = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(10) - .createSource(); - - List<? extends BoundedSource<Train>> splits = - fileSource.split(file.length() / 3, null); - for (BoundedSource<Train> splitSource : splits) { - int numItems = readEverythingFromReader(splitSource.createReader(null)).size(); - // Should not split while unstarted. - assertSplitAtFractionFails(splitSource, 0, 0.7, options); - assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options); - assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, options); - assertSplitAtFractionFails(splitSource, 0, 0.0, options); - assertSplitAtFractionFails(splitSource, 20, 0.3, options); - assertSplitAtFractionFails(splitSource, numItems, 1.0, options); - - // After reading 100 elements we will be approximately at position - // 0.99 * (endOffset - startOffset) hence trying to split at fraction 0.9 will be - // unsuccessful. - assertSplitAtFractionFails(splitSource, numItems, 0.9, options); - - // Following passes since we can always find a fraction that is extremely close to 1 such that - // the position suggested by the fraction will be larger than the position the reader is at - // after reading "items - 1" elements. - // This also passes for "numItemsToReadBeforeSplit = items" if the position at suggested - // fraction is larger than the position the reader is at after reading all "items" elements - // (i.e., the start position of the last element). This is true for most cases but will not - // be true if reader position is only one less than the end position. (i.e., the last element - // of the bundle start at the last byte that belongs to the bundle). - assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 0.999, options); - } - } - - @Test - public void testSplitAtFractionExhaustiveSingleByte() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .createSource(); - assertSplitAtFractionExhaustive(source, options); - } - - @Test - @Ignore( - "Multi-byte characters in XML are not supported because the parser " - + "currently does not correctly report byte offsets") - public void testSplitAtFractionExhaustiveMultiByte() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") - .withRecordElement("දà·à¶¸à·à¶»à·à¶º") - .withRecordClass(Train.class) - .createSource(); - assertSplitAtFractionExhaustive(source, options); - } - - @Test - @Category(NeedsRunner.class) - public void testReadXMLFilePattern() throws IOException { - List<Train> trains1 = generateRandomTrainList(20); - File file = createRandomTrainXML("temp1.xml", trains1); - List<Train> trains2 = generateRandomTrainList(10); - createRandomTrainXML("temp2.xml", trains2); - List<Train> trains3 = generateRandomTrainList(15); - createRandomTrainXML("temp3.xml", trains3); - generateRandomTrainList(8); - createRandomTrainXML("otherfile.xml", trains1); - - PCollection<Train> output = - p.apply( - "ReadFileData", - XmlIO.<Train>read() - .from(file.getParent() + "/" + "temp*.xml") - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024)); - - List<Train> expectedResults = new ArrayList<>(); - expectedResults.addAll(trains1); - expectedResults.addAll(trains2); - expectedResults.addAll(trains3); - - PAssert.that(output).containsInAnyOrder(expectedResults); - p.run(); - } - - @Test - public void testDisplayData() { - DisplayData displayData = - DisplayData.from( - XmlIO.<Integer>read() - .from("foo.xml") - .withRootElement("bird") - .withRecordElement("cat") - .withMinBundleSize(1234) - .withRecordClass(Integer.class)); - - assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); - assertThat(displayData, hasDisplayItem("rootElement", "bird")); - assertThat(displayData, hasDisplayItem("recordElement", "cat")); - assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); - assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 27fc614..5b1e243 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -77,6 +77,7 @@ <module>kinesis</module> <module>mongodb</module> <module>mqtt</module> + <module>xml</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml new file mode 100644 index 0000000..49ce239 --- /dev/null +++ b/sdks/java/io/xml/pom.xml @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-xml</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: XML</name> + <description>IO to read and write XML files.</description> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.codehaus.woodstox</groupId> + <artifactId>stax2-api</artifactId> + <version>${stax2.version}</version> + </dependency> + + <dependency> + <groupId>org.codehaus.woodstox</groupId> + <artifactId>woodstox-core-asl</artifactId> + <version>${woodstox.version}</version> + <scope>runtime</scope> + <exclusions> + <!-- javax.xml.stream:stax-api is included in JDK 1.6+ --> + <exclusion> + <groupId>javax.xml.stream</groupId> + <artifactId>stax-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java new file mode 100644 index 0000000..b8b1b79 --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.io.ByteStreams; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal; +import org.apache.beam.sdk.util.Structs; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms + * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object. + * + * @param <T> type of JAXB annotated objects that will be serialized. + */ +public class JAXBCoder<T> extends AtomicCoder<T> { + + private final Class<T> jaxbClass; + private final TypeDescriptor<T> typeDescriptor; + private transient volatile JAXBContext jaxbContext; + private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller; + private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller; + + public Class<T> getJAXBClass() { + return jaxbClass; + } + + private JAXBCoder(Class<T> jaxbClass) { + this.jaxbClass = jaxbClass; + this.typeDescriptor = TypeDescriptor.of(jaxbClass); + this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() { + @Override + protected Marshaller initialValue() { + try { + JAXBContext jaxbContext = getContext(); + return jaxbContext.createMarshaller(); + } catch (JAXBException e) { + throw new RuntimeException("Error when creating marshaller from JAXB Context.", e); + } + } + }; + this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() { + @Override + protected Unmarshaller initialValue() { + try { + JAXBContext jaxbContext = getContext(); + return jaxbContext.createUnmarshaller(); + } catch (Exception e) { + throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e); + } + } + }; + } + + /** + * Create a coder for a given type of JAXB annotated objects. + * + * @param jaxbClass the {@code Class} of the JAXB annotated objects. + */ + public static <T> JAXBCoder<T> of(Class<T> jaxbClass) { + return new JAXBCoder<>(jaxbClass); + } + + @Override + public void encode(T value, OutputStream outStream, Context context) + throws CoderException, IOException { + try { + if (!context.isWholeStream) { + try { + long size = getEncodedElementByteSize(value, Context.OUTER); + // record the number of bytes the XML consists of so when reading we only read the encoded + // value + VarInt.encode(size, outStream); + } catch (Exception e) { + throw new CoderException( + "An Exception occured while trying to get the size of an encoded representation", e); + } + } + + jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream)); + } catch (JAXBException e) { + throw new CoderException(e); + } + } + + @Override + public T decode(InputStream inStream, Context context) throws CoderException, IOException { + try { + InputStream stream = inStream; + if (!context.isWholeStream) { + long limit = VarInt.decodeLong(inStream); + stream = ByteStreams.limit(inStream, limit); + } + @SuppressWarnings("unchecked") + T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream)); + return obj; + } catch (JAXBException e) { + throw new CoderException(e); + } + } + + private JAXBContext getContext() throws JAXBException { + if (jaxbContext == null) { + synchronized (this) { + if (jaxbContext == null) { + jaxbContext = JAXBContext.newInstance(jaxbClass); + } + } + } + return jaxbContext; + } + + @Override + public String getEncodingId() { + return getJAXBClass().getName(); + } + + @Override + public TypeDescriptor<T> getEncodedTypeDescriptor() { + return typeDescriptor; + } + + private static class CloseIgnoringInputStream extends FilterInputStream { + + protected CloseIgnoringInputStream(InputStream in) { + super(in); + } + + @Override + public void close() { + // Do nothing. JAXB closes the underlying stream so we must filter out those calls. + } + } + + private static class CloseIgnoringOutputStream extends FilterOutputStream { + + protected CloseIgnoringOutputStream(OutputStream out) { + super(out); + } + + @Override + public void close() throws IOException { + // JAXB closes the underlying stream so we must filter out those calls. + } + } + + //////////////////////////////////////////////////////////////////////////////////// + // JSON Serialization details below + + private static final String JAXB_CLASS = "jaxb_class"; + + /** + * Constructor for JSON deserialization only. + */ + @JsonCreator + public static <T> JAXBCoder<T> of( + @JsonProperty(JAXB_CLASS) String jaxbClassName) { + try { + @SuppressWarnings("unchecked") + Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName); + return of(jaxbClass); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + protected CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + Structs.addString(result, JAXB_CLASS, jaxbClass.getName()); + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java new file mode 100644 index 0000000..bf0e1b5 --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CompressedSource; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** Transforms for reading and writing XML files using JAXB mappers. */ +public class XmlIO { + // CHECKSTYLE.OFF: JavadocStyle + /** + * Reads XML files. This source reads one or more XML files and creates a {@link PCollection} of a + * given type. Please note the example given below. + * + * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML + * element names that are defined by the user: + * + * <pre>{@code + * <root> + * <record> ... </record> + * <record> ... </record> + * <record> ... </record> + * ... + * <record> ... </record> + * </root> + * }</pre> + * + * <p>Basically, the XML document should contain a single root element with an inner list + * consisting entirely of record elements. The records may contain arbitrary XML content; however, + * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. + * This restriction enables reading from large XML files in parallel from different offsets in the + * file. + * + * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes. + * Additionally users must provide a class of a JAXB annotated Java type that can be used convert + * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. + * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type. + * Optionally users may provide a minimum size of a bundle that should be created for the source. + * + * <p>The following example shows how to use this method in a Beam pipeline: + * + * <pre>{@code + * PCollection<String> output = p.apply(XmlIO.<Record>read() + * .from(file.toPath().toString()) + * .withRootElement("root") + * .withRecordElement("record") + * .withRecordClass(Record.class)); + * }</pre> + * + * <p>Currently, only XML files that use single-byte characters are supported. Using a file that + * contains multi-byte characters may result in data loss or duplication. + * + * <h3>Permissions</h3> + * + * <p>Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner + * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of + * corresponding {@link PipelineRunner PipelineRunners} for more details. + * + * @param <T> Type of the objects that represent the records of the XML file. The {@code + * PCollection} generated by this source will be of this type. + */ + // CHECKSTYLE.ON: JavadocStyle + public static <T> Read<T> read() { + return new AutoValue_XmlIO_Read.Builder<T>() + .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) + .setCompressionType(Read.CompressionType.AUTO) + .build(); + } + + // CHECKSTYLE.OFF: JavadocStyle + /** + * A {@link FileBasedSink} that outputs records as XML-formatted elements. Writes a {@link + * PCollection} of records from JAXB-annotated classes to a single file location. + * + * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, + * this Sink will produce a single file consisting of a single root element that contains all of + * the elements in the PCollection. + * + * <p>XML Sinks are created with a base filename to write to, a root element name that will be + * used for the root element of the output files, and a class to bind to an XML element. This + * class will be used in the marshalling of records in an input PCollection to their XML + * representation and must be able to be bound using JAXB annotations (checked at pipeline + * construction time). + * + * <p>XML Sinks can be written to using the {@link Write} transform: + * + * <pre>{@code + * p.apply(XmlIO.<Type>write() + * .withRecordClass(Type.class) + * .withRootElement(root_element) + * .toFilenamePrefix(output_filename)); + * }</pre> + * + * <p>For example, consider the following class with JAXB annotations: + * + * <pre> + * {@literal @}XmlRootElement(name = "word_count_result") + * {@literal @}XmlType(propOrder = {"word", "frequency"}) + * public class WordFrequency { + * private String word; + * private long frequency; + * + * public WordFrequency() { } + * + * public WordFrequency(String word, long frequency) { + * this.word = word; + * this.frequency = frequency; + * } + * + * public void setWord(String word) { + * this.word = word; + * } + * + * public void setFrequency(long frequency) { + * this.frequency = frequency; + * } + * + * public long getFrequency() { + * return frequency; + * } + * + * public String getWord() { + * return word; + * } + * } + * </pre> + * + * <p>The following will produce XML output with a root element named "words" from a PCollection + * of WordFrequency objects: + * + * <pre>{@code + * p.apply(XmlIO.<WordFrequency>write() + * .withRecordClass(WordFrequency.class) + * .withRootElement("words") + * .toFilenamePrefix(output_file)); + * }</pre> + * + * <p>The output of which will look like: + * + * <pre>{@code + * <words> + * + * <word_count_result> + * <word>decreased</word> + * <frequency>1</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>War</word> + * <frequency>4</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>empress'</word> + * <frequency>14</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>stoops</word> + * <frequency>6</frequency> + * </word_count_result> + * + * ... + * </words> + * }</pre> + */ + // CHECKSTYLE.ON: JavadocStyle + public static <T> Write<T> write() { + return new AutoValue_XmlIO_Write.Builder<T>().build(); + } + + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; + + @Nullable + abstract String getFileOrPatternSpec(); + + @Nullable + abstract String getRootElement(); + + @Nullable + abstract String getRecordElement(); + + @Nullable + abstract Class<T> getRecordClass(); + + abstract CompressionType getCompressionType(); + + abstract long getMinBundleSize(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec); + + abstract Builder<T> setRootElement(String rootElement); + + abstract Builder<T> setRecordElement(String recordElement); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setMinBundleSize(long minBundleSize); + + abstract Builder<T> setCompressionType(CompressionType compressionType); + + abstract Read<T> build(); + } + + /** Strategy for determining the compression type of XML files being read. */ + public enum CompressionType { + /** Automatically determine the compression type based on filename extension. */ + AUTO(""), + /** Uncompressed (i.e., may be split). */ + UNCOMPRESSED(""), + /** GZipped. */ + GZIP(".gz"), + /** BZipped. */ + BZIP2(".bz2"), + /** Zipped. */ + ZIP(".zip"), + /** Deflate compressed. */ + DEFLATE(".deflate"); + + private String filenameSuffix; + + CompressionType(String suffix) { + this.filenameSuffix = suffix; + } + + /** + * Determine if a given filename matches a compression type based on its extension. + * + * @param filename the filename to match + * @return true iff the filename ends with the compression type's known extension. + */ + public boolean matches(String filename) { + return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); + } + } + + /** + * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML + * file should be of the form defined in {@link #read}. + */ + public Read<T> from(String fileOrPatternSpec) { + return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); + } + + /** + * Sets name of the root element of the XML document. This will be used to create a valid + * starting root element when initiating a bundle of records created from an XML document. This + * is a required parameter. + */ + public Read<T> withRootElement(String rootElement) { + return toBuilder().setRootElement(rootElement).build(); + } + + /** + * Sets name of the record element of the XML document. This will be used to determine offset of + * the first record of a bundle created from the XML document. This is a required parameter. + */ + public Read<T> withRecordElement(String recordElement) { + return toBuilder().setRecordElement(recordElement).build(); + } + + /** + * Sets a JAXB annotated class that can be populated using a record of the provided XML file. + * This will be used when unmarshalling record objects from the XML file. This is a required + * parameter. + */ + public Read<T> withRecordClass(Class<T> recordClass) { + return toBuilder().setRecordClass(recordClass).build(); + } + + /** + * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please + * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional + * parameter. + */ + public Read<T> withMinBundleSize(long minBundleSize) { + return toBuilder().setMinBundleSize(minBundleSize).build(); + } + + /** + * Decompresses all input files using the specified compression type. + * + * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. In this + * mode, the compression type of the file is determined by its extension. Supports .gz, .bz2, + * .zip and .deflate compression. + */ + public Read<T> withCompressionType(CompressionType compressionType) { + return toBuilder().setCompressionType(compressionType).build(); + } + + @Override + public void validate(PBegin input) { + checkNotNull( + getRootElement(), + "rootElement is null. Use builder method withRootElement() to set this."); + checkNotNull( + getRecordElement(), + "recordElement is null. Use builder method withRecordElement() to set this."); + checkNotNull( + getRecordClass(), + "recordClass is null. Use builder method withRecordClass() to set this."); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotDefault( + DisplayData.item("minBundleSize", getMinBundleSize()) + .withLabel("Minimum Bundle Size"), + 1L) + .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern")) + .addIfNotNull( + DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) + .addIfNotNull( + DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); + } + + @VisibleForTesting + BoundedSource<T> createSource() { + XmlSource<T> source = new XmlSource<>(this); + switch (getCompressionType()) { + case UNCOMPRESSED: + return source; + case AUTO: + return CompressedSource.from(source); + case BZIP2: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.BZIP2); + case GZIP: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.GZIP); + case ZIP: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.ZIP); + case DEFLATE: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.DEFLATE); + default: + throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); + } + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.apply(org.apache.beam.sdk.io.Read.from(createSource())); + } + } + + /** Implementation of {@link #write}. */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { + @Nullable + abstract String getFilenamePrefix(); + + @Nullable + abstract Class<T> getRecordClass(); + + @Nullable + abstract String getRootElement(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilenamePrefix(String baseOutputFilename); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setRootElement(String rootElement); + + abstract Write<T> build(); + } + + /** + * Writes to files with the given path prefix. + * + * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is + * the number of output bundles. + */ + public Write<T> toFilenamePrefix(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** + * Writes objects of the given class mapped to XML elements using JAXB. + * + * <p>The specified class must be able to be used to create a JAXB context. + */ + public Write<T> withRecordClass(Class<T> recordClass) { + return toBuilder().setRecordClass(recordClass).build(); + } + + /** Sets the enclosing root element for the generated XML files. */ + public Write<T> withRootElement(String rootElement) { + return toBuilder().setRootElement(rootElement).build(); + } + + @Override + public void validate(PCollection<T> input) { + checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context."); + checkNotNull(getRootElement(), "Missing a root element name."); + checkNotNull(getFilenamePrefix(), "Missing a filename to write to."); + try { + JAXBContext.newInstance(getRecordClass()); + } catch (JAXBException e) { + throw new RuntimeException("Error binding classes to a JAXB Context.", e); + } + } + + @Override + public PDone expand(PCollection<T> input) { + return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); + } + + @VisibleForTesting + XmlSink<T> createSink() { + return new XmlSink<>(this); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + createSink().populateFileBasedDisplayData(builder); + builder + .addIfNotNull( + DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java new file mode 100644 index 0000000..2e7dba1 --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import com.google.common.annotations.VisibleForTesting; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Marshaller; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; + +/** Implementation of {@link XmlIO#write}. */ +class XmlSink<T> extends FileBasedSink<T> { + protected static final String XML_EXTENSION = "xml"; + + private final XmlIO.Write<T> spec; + + XmlSink(XmlIO.Write<T> spec) { + super(spec.getFilenamePrefix(), XML_EXTENSION); + this.spec = spec; + } + + /** + * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have + * been set and that the class can be bound in a JAXB context. + */ + @Override + public void validate(PipelineOptions options) { + spec.validate(null); + } + + /** + * Creates an {@link XmlWriteOperation}. + */ + @Override + public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) { + return new XmlWriteOperation<>(this); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + spec.populateDisplayData(builder); + } + + void populateFileBasedDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } + + /** + * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s. + */ + protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> { + public XmlWriteOperation(XmlSink<T> sink) { + super(sink); + } + + /** + * Creates a {@link XmlWriter} with a marshaller for the type it will write. + */ + @Override + public XmlWriter<T> createWriter(PipelineOptions options) throws Exception { + JAXBContext context; + Marshaller marshaller; + context = JAXBContext.newInstance(getSink().spec.getRecordClass()); + marshaller = context.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); + marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); + return new XmlWriter<>(this, marshaller); + } + + /** + * Return the XmlSink.Bound for this write operation. + */ + @Override + public XmlSink<T> getSink() { + return (XmlSink<T>) super.getSink(); + } + + @VisibleForTesting + String getTemporaryDirectory() { + return this.tempDirectory.get(); + } + } + + /** + * A {@link FileBasedWriter} that can write objects as XML elements. + */ + protected static final class XmlWriter<T> extends FileBasedWriter<T> { + final Marshaller marshaller; + private OutputStream os = null; + + public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) { + super(writeOperation); + this.marshaller = marshaller; + } + + /** + * Creates the output stream that elements will be written to. + */ + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + os = Channels.newOutputStream(channel); + } + + /** + * Writes the root element opening tag. + */ + @Override + protected void writeHeader() throws Exception { + String rootElementName = getWriteOperation().getSink().spec.getRootElement(); + os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n")); + } + + /** + * Writes the root element closing tag. + */ + @Override + protected void writeFooter() throws Exception { + String rootElementName = getWriteOperation().getSink().spec.getRootElement(); + os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">")); + } + + /** + * Writes a value to the stream. + */ + @Override + public void write(T value) throws Exception { + marshaller.marshal(value, os); + } + + /** + * Return the XmlWriteOperation this write belongs to. + */ + @Override + public XmlWriteOperation<T> getWriteOperation() { + return (XmlWriteOperation<T>) super.getWriteOperation(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java new file mode 100644 index 0000000..876c782 --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.NoSuchElementException; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.ValidationEvent; +import javax.xml.bind.ValidationEventHandler; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.codehaus.stax2.XMLInputFactory2; + +/** Implementation of {@link XmlIO#read}. */ +public class XmlSource<T> extends FileBasedSource<T> { + + private static final String XML_VERSION = "1.1"; + + private final XmlIO.Read<T> spec; + + XmlSource(XmlIO.Read<T> spec) { + super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize()); + this.spec = spec; + } + + private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) { + super(metadata, spec.getMinBundleSize(), startOffset, endOffset); + this.spec = spec; + } + + @Override + protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) { + return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end); + } + + @Override + protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) { + return new XMLReader<T>(this); + } + + @Override + public void validate() { + super.validate(); + spec.validate(null); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + spec.populateDisplayData(builder); + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return JAXBCoder.of(spec.getRecordClass()); + } + + /** + * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML + * file should be of the form defined at {@link XmlSource}. + * + * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp + * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}. + * + * @param <T> Type of objects that will be read by the reader. + */ + private static class XMLReader<T> extends FileBasedReader<T> { + // The amount of bytes read from the channel to memory when determining the starting offset of + // the first record in a bundle. After matching to starting offset of the first record the + // remaining bytes read to this buffer and the bytes still not read from the channel are used to + // create the XML parser. + private static final int BUF_SIZE = 1024; + + // This should be the maximum number of bytes a character will encode to, for any encoding + // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be + // four bytes. + private static final int MAX_CHAR_BYTES = 4; + + // In order to support reading starting in the middle of an XML file, we construct an imaginary + // well-formed document (a header and root tag followed by the contents of the input starting at + // the record boundary) and feed it to the parser. Because of this, the offset reported by the + // XML parser is not the same as offset in the original file. They differ by a constant amount: + // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset; + // Note that this is true only for files with single-byte characters. + // It appears that, as of writing, there does not exist a Java XML parser capable of correctly + // reporting byte offsets of elements in the presence of multi-byte characters. + private long parserBaseOffset = 0; + private boolean readingStarted = false; + + // If true, the current bundle does not contain any records. + private boolean emptyBundle = false; + + private Unmarshaller jaxbUnmarshaller = null; + private XMLStreamReader parser = null; + + private T currentRecord = null; + + // Byte offset of the current record in the XML file provided when creating the source. + private long currentByteOffset = 0; + + public XMLReader(XmlSource<T> source) { + super(source); + + // Set up a JAXB Unmarshaller that can be used to unmarshall record objects. + try { + JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass()); + jaxbUnmarshaller = jaxbContext.createUnmarshaller(); + + // Throw errors if validation fails. JAXB by default ignores validation errors. + jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() { + @Override + public boolean handleEvent(ValidationEvent event) { + throw new RuntimeException(event.getMessage(), event.getLinkedException()); + } + }); + } catch (JAXBException e) { + throw new RuntimeException(e); + } + } + + @Override + public synchronized XmlSource<T> getCurrentSource() { + return (XmlSource<T>) super.getCurrentSource(); + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + // This method determines the correct starting offset of the first record by reading bytes + // from the ReadableByteChannel. This implementation does not need the channel to be a + // SeekableByteChannel. + // The method tries to determine the first record element in the byte channel. The first + // record must start with the characters "<recordElement" where "recordElement" is the + // record element of the XML document described above. For the match to be complete this + // has to be followed by one of following. + // * any whitespace character + // * '>' character + // * '/' character (to support empty records). + // + // After this match this method creates the XML parser for parsing the XML document, + // feeding it a fake document consisting of an XML header and the <rootElement> tag followed + // by the contents of channel starting from <recordElement. The <rootElement> tag may be never + // closed. + + // This stores any bytes that should be used prior to the remaining bytes of the channel when + // creating an XML parser object. + ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream(); + // A dummy declaration and root for the document with proper XML version and encoding. Without + // this XML parsing may fail or may produce incorrect results. + + byte[] dummyStartDocumentBytes = + (String.format( + "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>", + XML_VERSION, getCurrentSource().spec.getRootElement())) + .getBytes(StandardCharsets.UTF_8); + preambleByteBuffer.write(dummyStartDocumentBytes); + // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This + // method returns the offset and stores any bytes that should be used when creating the XML + // parser in preambleByteBuffer. + long offsetInFileOfRecordElement = + getFirstOccurenceOfRecordElement(channel, preambleByteBuffer); + if (offsetInFileOfRecordElement < 0) { + // Bundle has no records. So marking this bundle as an empty bundle. + emptyBundle = true; + return; + } else { + byte[] preambleBytes = preambleByteBuffer.toByteArray(); + currentByteOffset = offsetInFileOfRecordElement; + setUpXMLParser(channel, preambleBytes); + parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length; + } + readingStarted = true; + } + + // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts + // any bytes read past the starting offset of the next record back to the preambleByteBuffer. + // If a record is found, returns the starting offset of the record, otherwise + // returns -1. + private long getFirstOccurenceOfRecordElement( + ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException { + int byteIndexInRecordElementToMatch = 0; + // Index of the byte in the string "<recordElement" to be matched + // against the current byte from the stream. + boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the + // next character to confirm if this is a positive match. + boolean fullyMatched = false; // If true, record element was fully matched. + + // This gives the offset of the byte currently being read. We do a '-1' here since we + // increment this value at the beginning of the while loop below. + long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1; + long startingOffsetInFileOfCurrentMatch = -1; + // If this is non-negative, currently there is a match in progress and this value gives the + // starting offset of the match currently being conducted. + boolean matchStarted = false; // If true, a match is currently in progress. + + // These two values are used to determine the character immediately following a match for + // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above. + byte[] charBytes = new byte[MAX_CHAR_BYTES]; + int charBytesFound = 0; + + ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); + byte[] recordStartBytes = + ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8); + + outer: while (channel.read(buf) > 0) { + buf.flip(); + while (buf.hasRemaining()) { + offsetInFileOfCurrentByte++; + byte b = buf.get(); + boolean reset = false; + if (recordStartBytesMatched) { + // We already matched "<recordElement" reading the next character to determine if this + // is a positive match for a new record. + charBytes[charBytesFound] = b; + charBytesFound++; + Character c = null; + if (charBytesFound == charBytes.length) { + CharBuffer charBuf = CharBuffer.allocate(1); + InputStream charBufStream = new ByteArrayInputStream(charBytes); + java.io.Reader reader = + new InputStreamReader(charBufStream, StandardCharsets.UTF_8); + int read = reader.read(); + if (read <= 0) { + return -1; + } + charBuf.flip(); + c = (char) read; + } else { + continue; + } + + // Record start may be of following forms + // * "<recordElement<whitespace>..." + // * "<recordElement>..." + // * "<recordElement/..." + if (Character.isWhitespace(c) || c == '>' || c == '/') { + fullyMatched = true; + // Add the recordStartBytes and charBytes to preambleByteBuffer since these were + // already read from the channel. + preambleByteBuffer.write(recordStartBytes); + preambleByteBuffer.write(charBytes); + // Also add the rest of the current buffer to preambleByteBuffer. + while (buf.hasRemaining()) { + preambleByteBuffer.write(buf.get()); + } + break outer; + } else { + // Matching was unsuccessful. Reset the buffer to include bytes read for the char. + ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE); + newbuf.put(charBytes); + offsetInFileOfCurrentByte -= charBytes.length; + while (buf.hasRemaining()) { + newbuf.put(buf.get()); + } + newbuf.flip(); + buf = newbuf; + + // Ignore everything and try again starting from the current buffer. + reset = true; + } + } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) { + // Next byte matched. + if (!matchStarted) { + // Match was for the first byte, record the starting offset. + matchStarted = true; + startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte; + } + byteIndexInRecordElementToMatch++; + } else { + // Not a match. Ignore everything and try again starting at current point. + reset = true; + } + if (reset) { + // Clear variables and try to match starting from the next byte. + byteIndexInRecordElementToMatch = 0; + startingOffsetInFileOfCurrentMatch = -1; + matchStarted = false; + recordStartBytesMatched = false; + charBytes = new byte[MAX_CHAR_BYTES]; + charBytesFound = 0; + } + if (byteIndexInRecordElementToMatch == recordStartBytes.length) { + // "<recordElement" matched. Need to still check next byte since this might be an + // element that has "recordElement" as a prefix. + recordStartBytesMatched = true; + } + } + buf.clear(); + } + + if (!fullyMatched) { + return -1; + } else { + return startingOffsetInFileOfCurrentMatch; + } + } + + private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException { + try { + // We use Woodstox because the StAX implementation provided by OpenJDK reports + // character locations incorrectly. Note that Woodstox still currently reports *byte* + // locations incorrectly when parsing documents that contain multi-byte characters. + XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance(); + this.parser = xmlInputFactory.createXMLStreamReader( + new SequenceInputStream( + new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)), + "UTF-8"); + + // Current offset should be the offset before reading the record element. + while (true) { + int event = parser.next(); + if (event == XMLStreamConstants.START_ELEMENT) { + String localName = parser.getLocalName(); + if (localName.equals(getCurrentSource().spec.getRecordElement())) { + break; + } + } + } + } catch (FactoryConfigurationError | XMLStreamException e) { + throw new IOException(e); + } + } + + @Override + protected boolean readNextRecord() throws IOException { + if (emptyBundle) { + currentByteOffset = Long.MAX_VALUE; + return false; + } + try { + // Update current offset and check if the next value is the record element. + currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); + while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) { + parser.next(); + currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); + if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) { + currentByteOffset = Long.MAX_VALUE; + return false; + } + } + JAXBElement<T> jb = + jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass()); + currentRecord = jb.getValue(); + return true; + } catch (JAXBException | XMLStreamException e) { + throw new IOException(e); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (!readingStarted) { + throw new NoSuchElementException(); + } + return currentRecord; + } + + @Override + protected boolean isAtSplitPoint() { + // Every record is at a split point. + return true; + } + + @Override + protected long getCurrentOffset() { + return currentByteOffset; + } + } +}
