[FLINK-3921] Add support to set encoding in CsvReader and StringParser. This closes #2060.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186af6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186af6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186af6 Branch: refs/heads/master Commit: f2186af6702c9fe48c91d5c2d7748378984cd29b Parents: 2d8f03e Author: Joshi <[email protected]> Authored: Wed Jun 1 13:26:47 2016 -0700 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 8 18:47:36 2016 +0100 ---------------------------------------------------------------------- .../api/common/io/GenericCsvInputFormat.java | 64 ++++++++++---------- .../apache/flink/types/parser/FieldParser.java | 38 +++++++++--- .../apache/flink/types/parser/StringParser.java | 8 +-- .../common/io/GenericCsvInputFormatTest.java | 40 ++++++++++-- .../types/parser/VarLengthStringParserTest.java | 20 ++++++ .../org/apache/flink/api/java/io/CsvReader.java | 24 +++++++- .../apache/flink/api/java/io/CSVReaderTest.java | 9 +++ 7 files changed, 155 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 85d9cd8..0ced22b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -25,14 +25,12 @@ import org.apache.flink.types.parser.FieldParser; import org.apache.flink.types.parser.StringParser; import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.util.InstantiationUtil; - +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Map; import java.util.TreeMap; @@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class); - /** The default charset to convert strings to bytes */ - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - + /** The charset used to convert strings to bytes */ + private Charset charset = Charset.forName("UTF-8"); + private static final Class<?>[] EMPTY_TYPES = new Class<?>[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0]; @@ -107,6 +105,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> super(filePath, null); } + protected GenericCsvInputFormat(Path filePath, Charset charset) { + this(filePath); + this.charset = Preconditions.checkNotNull(charset); + } + // -------------------------------------------------------------------------------------------- public int getNumberOfFieldsTotal() { @@ -121,32 +124,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> return commentPrefix; } - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, UTF_8_CHARSET); + setCommentPrefix(commentPrefix, charset); } - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException("Charset name must not be null"); - } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; - } - } - - public void setCommentPrefix(String commentPrefix, Charset charset) { + private void setCommentPrefix(String commentPrefix, Charset charset) { if (charset == null) { throw new IllegalArgumentException("Charset must not be null"); } @@ -174,7 +156,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> } public void setFieldDelimiter(String delimiter) { - this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET); + this.fieldDelim = delimiter.getBytes(charset); } public boolean isLenient() { @@ -314,6 +296,25 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> this.fieldIncluded = includedMask; } + /** + * Gets the character set for the parser. Default is set to UTF-8. + * + * @return The charset for the parser. + */ + Charset getCharset() { + return this.charset; + } + + /** + * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset + * when doing a parse. + * + * @param charset The character set to set. + */ + public void setCharset(Charset charset) { + this.charset = Preconditions.checkNotNull(charset); + } + // -------------------------------------------------------------------------------------------- // Runtime methods // -------------------------------------------------------------------------------------------- @@ -334,6 +335,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class); + p.setCharset(this.getCharset()); if (this.quotedStringParsing) { if (p instanceof StringParser) { ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); @@ -449,7 +451,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> // search for ending quote character, continue when it is escaped i++; - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) { i++; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 200d239..d9eeecc 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -19,11 +19,6 @@ package org.apache.flink.types.parser; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Map; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.types.BooleanValue; import org.apache.flink.types.ByteValue; @@ -34,6 +29,12 @@ import org.apache.flink.types.LongValue; import org.apache.flink.types.ShortValue; import org.apache.flink.types.StringValue; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + /** * A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated * by the end of the byte sequence or a delimiter. @@ -77,9 +78,11 @@ public abstract class FieldParser<T> { /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("UTF-8"); + private ParseErrorState errorState = ParseErrorState.NONE; - + /** * Parses the value of a field from the byte array, taking care of properly reset * the state of this parser. @@ -217,7 +220,26 @@ public abstract class FieldParser<T> { return limitedLength; } - + + /* + * Gets the Charset for the parser.Default is set to ASCII + * + * @return The charset for the parser. + */ + public Charset getCharset() { + return this.charset; + } + + /** + * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset + * when doing a parse. + * + * @param charset The charset to set. + */ + public void setCharset(Charset charset) { + this.charset = charset; + } + // -------------------------------------------------------------------------------------------- // Mapping from types to parsers // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 1a2c7e3..7b46a7e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> { // check for proper termination if (i == limit) { // either by end of line - this.result = new String(bytes, startPos + 1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset()); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - this.result = new String(bytes, startPos + 1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset()); return i + delimiter.length; } else { // no proper termination @@ -87,14 +87,14 @@ public class StringParser extends FieldParser<String> { if (limit == startPos) { setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } - this.result = new String(bytes, startPos, limit - startPos); + this.result = new String(bytes, startPos, limit - startPos, getCharset()); return limit; } else { // delimiter found. if (i == startPos) { setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } - this.result = new String(bytes, startPos, i - startPos); + this.result = new String(bytes, startPos, i - startPos, getCharset()); return i + delimiter.length; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index e3215c6..d063ddc 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -28,6 +28,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; import java.util.zip.DeflaterOutputStream; import java.util.zip.GZIPOutputStream; @@ -485,7 +486,7 @@ public class GenericCsvInputFormatTest { fail("Input format accepted on invalid input."); } catch (ParseException e) { - ; // all good + // all good } } catch (Exception ex) { @@ -547,7 +548,38 @@ public class GenericCsvInputFormatTest { fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); } } - + + @Test + public void testReadWithCharset() throws IOException { + try { + final String fileContent = "\u00bf|Flink|\u00f1"; + final FileInputSplit split = createTempFile(fileContent); + + final Configuration parameters = new Configuration(); + + format.setCharset(Charset.forName("UTF-8")); + format.setFieldDelimiter("|"); + format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class); + + format.configure(parameters); + format.open(split); + + Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()}; + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals("\u00bf", ((StringValue) values[0]).getValue()); + assertEquals("Flink", ((StringValue) values[1]).getValue()); + assertEquals("\u00f1", ((StringValue) values[2]).getValue()); + + assertNull(format.nextRecord(values)); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + } + @Test public void readWithEmptyField() { try { @@ -722,7 +754,7 @@ public class GenericCsvInputFormatTest { return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); } - private final Value[] createIntValues(int num) { + private Value[] createIntValues(int num) { Value[] v = new Value[num]; for (int i = 0; i < num; i++) { @@ -732,7 +764,7 @@ public class GenericCsvInputFormatTest { return v; } - private final Value[] createLongValues(int num) { + private Value[] createLongValues(int num) { Value[] v = new Value[num]; for (int i = 0; i < num; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 1fe8850..1c5579e 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -25,6 +25,8 @@ import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; import org.junit.Test; +import java.nio.charset.Charset; + public class VarLengthStringParserTest { public StringValueParser parser = new StringValueParser(); @@ -194,4 +196,22 @@ public class VarLengthStringParserTest { startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos < 0); } + + @Test + public void testParseValidMixedStringsWithCharset() { + + Charset charset = Charset.forName("US-ASCII"); + this.parser = new StringValueParser(); + this.parser.enableQuotedStringParsing((byte) '@'); + + // check valid strings with out whitespaces and trailing delimiter + byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(); + StringValue s = new StringValue(); + + int startPos = 0; + parser.setCharset(charset); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s); + assertTrue(startPos == 11); + assertTrue(s.getValue().equals("abcde|gh")); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index 8be5dc2..b13b8aa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +65,8 @@ public class CsvReader { protected boolean skipFirstLineAsHeader = false; protected boolean ignoreInvalidLines = false; + + private Charset charset = Charset.forName("UTF-8"); // -------------------------------------------------------------------------------------------- @@ -157,7 +160,25 @@ public class CsvReader { this.commentPrefix = commentPrefix; return this; } - + + /** + * Gets the character set for the reader. Default is set to UTF-8. + * + * @return The charset for the reader. + */ + public Charset getCharset() { + return this.charset; + } + + /** + * Sets the charset of the reader + * + * @param charset The character set to set. + */ + public void setCharset(Charset charset) { + this.charset = Preconditions.checkNotNull(charset); + } + /** * Configures which fields of the CSV file should be included and which should be skipped. The * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean @@ -340,6 +361,7 @@ public class CsvReader { format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); format.setLenient(ignoreInvalidLines); + format.setCharset(this.charset); if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index 8b12315..e1c8023 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -75,6 +76,14 @@ public class CSVReaderTest { reader.ignoreComments("#"); assertEquals("#", reader.commentPrefix); } + + @Test + public void testCharset() { + CsvReader reader = getCsvReader(); + assertEquals(reader.getCharset(), Charset.forName("UTF-8")); + reader.setCharset(Charset.forName("US-ASCII")); + assertEquals(reader.getCharset(), Charset.forName("US-ASCII")); + } @Test public void testIncludeFieldsDense() {
