Repository: crunch Updated Branches: refs/heads/master 050b4a9e1 -> 5e6e33536
CRUNCH-491: Use the CharsetEncoder to compute the raw byte size of a char Signed-off-by: tzolov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5e6e3353 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5e6e3353 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5e6e3353 Branch: refs/heads/master Commit: 5e6e335363d86630a14067b37bd8efc6e5bc1607 Parents: 050b4a9 Author: tzolov <[email protected]> Authored: Thu Feb 5 16:32:58 2015 +0100 Committer: tzolov <[email protected]> Committed: Wed Feb 11 09:16:54 2015 +0100 ---------------------------------------------------------------------- .../crunch/io/text/xml/XmlInputFormat.java | 24 +++- .../apache/crunch/io/text/xml/XmlSource.java | 2 +- .../crunch/io/text/xml/XmlRecordReaderTest.java | 117 +++++++++++++++++++ .../src/main/resources/xmlSourceSample3.xml | 27 +++++ 4 files changed, 165 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java index 58157fe..79f867c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlInputFormat.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -40,7 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; -import com.google.common.primitives.Chars; /** * Reads records that are delimited by a specific begin/end tag. @@ -67,7 +69,7 @@ public class XmlInputFormat extends TextInputFormat { /** * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified by the - * start tag and end tag + * start tag and end tag. */ public static class XmlRecordReader extends RecordReader<LongWritable, Text> { @@ -84,7 +86,9 @@ public class XmlInputFormat extends TextInputFormat { private final BufferedReader inReader; private final OutputStreamWriter outWriter; private final String inputEncoding; - private int readByteCounter = 0; + private long readByteCounter; + + private CharsetEncoder charsetEncoder; public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { inputEncoding = conf.get(ENCODING, DEFAULT_ENCODING); @@ -98,9 +102,12 @@ public class XmlInputFormat extends TextInputFormat { FileSystem fs = file.getFileSystem(conf); FSDataInputStream fsin = fs.open(split.getPath()); fsin.seek(start); + readByteCounter = start; inReader = new BufferedReader(new InputStreamReader(fsin, Charset.forName(inputEncoding))); outBuffer = new DataOutputBuffer(); outWriter = new OutputStreamWriter(outBuffer, inputEncoding); + + charsetEncoder = Charset.forName(inputEncoding).newEncoder(); } private boolean next(LongWritable key, Text value) throws IOException { @@ -142,7 +149,7 @@ public class XmlInputFormat extends TextInputFormat { while (true) { int nextInCharacter = inReader.read(); - readByteCounter = +Chars.toByteArray((char) nextInCharacter).length; + readByteCounter = readByteCounter + calculateCharacterByteLength((char) nextInCharacter); // end of file: if (nextInCharacter == -1) { @@ -189,5 +196,14 @@ public class XmlInputFormat extends TextInputFormat { currentValue = new Text(); return next(currentKey, currentValue); } + + private int calculateCharacterByteLength(final char character) { + try { + return charsetEncoder.encode(CharBuffer.wrap(new char[] { character })).limit(); + } catch (final CharacterCodingException e) { + throw new RuntimeException("The character attempting to be read (" + character + ") could not be encoded with " + + inputEncoding); + } + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java index 2e434e7..c6ebb5e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/xml/XmlSource.java @@ -26,7 +26,7 @@ import com.google.common.base.Charsets; /** * Large XML documents composed of repetitive XML elements can be broken into chunks delimited by element's start and - * end tag. The {@link XmlSource2} process XML files and extract out the XML between the pre-configured start / end + * end tag. The {@link XmlSource} process XML files and extract out the XML between the pre-configured start / end * tags. Developer should process the content between the tags. * * The {@link XmlSource} does not parse the input XML files and is not aware of the XML semantics. It just splits the http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java new file mode 100644 index 0000000..9876b46 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/io/text/xml/XmlRecordReaderTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.text.xml; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.text.xml.XmlInputFormat.XmlRecordReader; +import org.apache.crunch.test.TemporaryPath; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * {@link XmlRecordReader} Test. + * + */ +public class XmlRecordReaderTest { + + @Rule + public transient TemporaryPath tmpDir = new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir"); + + private Configuration conf; + + private String xmlFile; + + private long xmlFileLength; + + @Before + public void before() throws IOException { + xmlFile = tmpDir.copyResourceFileName("xmlSourceSample3.xml"); + xmlFileLength = getFileLength(xmlFile); + + conf = new org.apache.hadoop.conf.Configuration(); + conf.set(XmlInputFormat.START_TAG_KEY, "<PLANT"); + conf.set(XmlInputFormat.END_TAG_KEY, "</PLANT>"); + } + + @Test + public void testStartOffsets() throws Exception { + /* + * The xmlSourceSample3.xml file byte ranges: + * + * 50-252 - first PLANT element. + * + * 254-454 - second PLANT element. + * + * 456-658 - third PLANT element. + */ + assertEquals("Starting from offset 0 should read all elements", 3, readXmlElements(createSplit(0, xmlFileLength))); + assertEquals("Offset is in the middle of the first element. Should read only the remaining 2 elements", 2, + readXmlElements(createSplit(100, xmlFileLength))); + assertEquals("Offset is in the middle of the second element. Should read only the remaining 1 element", 1, + readXmlElements(createSplit(300, xmlFileLength))); + assertEquals("Offset is in the middle of the third element. Should read no elements", 0, + readXmlElements(createSplit(500, xmlFileLength))); + } + + @Test + public void readThroughSplitEnd() throws IOException, InterruptedException { + // Third element starts at position: 456 and has length: 202 + assertEquals("Split starts before the 3rd element and ends in the middle of the 3rd element.", 1, + readXmlElements(createSplit(300, ((456 - 300) + 202 / 2)))); + assertEquals("Split starts and ends before the 3rd element.", 0, readXmlElements(createSplit(300, (456 - 300)))); + } + + private FileSplit createSplit(long offset, long length) { + return new FileSplit(new Path(xmlFile), offset, length, new String[] {}); + } + + private long readXmlElements(FileSplit split) throws IOException, InterruptedException { + + int elementCount = 0; + + XmlRecordReader xmlRecordReader = new XmlRecordReader(split, conf); + try { + long lastKey = 0; + while (xmlRecordReader.nextKeyValue()) { + elementCount++; + assertTrue(xmlRecordReader.getCurrentKey().get() > lastKey); + lastKey = xmlRecordReader.getCurrentKey().get(); + assertTrue(xmlRecordReader.getCurrentValue().getLength() > 0); + } + } finally { + xmlRecordReader.close(); + } + + return elementCount; + } + + private long getFileLength(String fileName) throws FileNotFoundException, IOException { + return new File(fileName).length(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5e6e3353/crunch-test/src/main/resources/xmlSourceSample3.xml ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/resources/xmlSourceSample3.xml b/crunch-test/src/main/resources/xmlSourceSample3.xml new file mode 100644 index 0000000..9db87fe --- /dev/null +++ b/crunch-test/src/main/resources/xmlSourceSample3.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<CATALOG> + <PLANT> + <COMMON>Bloodroot</COMMON> + <BOTANICAL>Sanguinaria canadensis</BOTANICAL> + <ZONE>4</ZONE> + <LIGHT>Mostly Shady</LIGHT> + <PRICE>$2.44</PRICE> + <AVAILABILITY>031599</AVAILABILITY> + </PLANT> + <PLANT> + <COMMON>Columbine</COMMON> + <BOTANICAL>Aquilegia canadensis</BOTANICAL> + <ZONE>3</ZONE> + <LIGHT>Mostly Shady</LIGHT> + <PRICE>$9.37</PRICE> + <AVAILABILITY>030699</AVAILABILITY> + </PLANT> + <PLANT> + <COMMON>Marsh Marigold</COMMON> + <BOTANICAL>Caltha palustris</BOTANICAL> + <ZONE>4</ZONE> + <LIGHT>Mostly Sunny</LIGHT> + <PRICE>$6.81</PRICE> + <AVAILABILITY>051799</AVAILABILITY> + </PLANT> +</CATALOG>
