[FLINK-3921] Add support to set encoding in CsvReader and StringParser.

This closes #2060.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186af6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186af6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186af6

Branch: refs/heads/master
Commit: f2186af6702c9fe48c91d5c2d7748378984cd29b
Parents: 2d8f03e
Author: Joshi <[email protected]>
Authored: Wed Jun 1 13:26:47 2016 -0700
Committer: Fabian Hueske <[email protected]>
Committed: Thu Dec 8 18:47:36 2016 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    | 64 ++++++++++----------
 .../apache/flink/types/parser/FieldParser.java  | 38 +++++++++---
 .../apache/flink/types/parser/StringParser.java |  8 +--
 .../common/io/GenericCsvInputFormatTest.java    | 40 ++++++++++--
 .../types/parser/VarLengthStringParserTest.java | 20 ++++++
 .../org/apache/flink/api/java/io/CsvReader.java | 24 +++++++-
 .../apache/flink/api/java/io/CSVReaderTest.java |  9 +++
 7 files changed, 155 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 85d9cd8..0ced22b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,14 +25,12 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
@@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        
        private static final Logger LOG = 
LoggerFactory.getLogger(GenericCsvInputFormat.class);
 
-       /** The default charset  to convert strings to bytes */
-       private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-       
+       /** The charset used to convert strings to bytes */
+       private Charset charset = Charset.forName("UTF-8");
+
        private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
        
        private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -107,6 +105,11 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                super(filePath, null);
        }
 
+       protected GenericCsvInputFormat(Path filePath, Charset charset) {
+               this(filePath);
+               this.charset = Preconditions.checkNotNull(charset);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public int getNumberOfFieldsTotal() {
@@ -121,32 +124,11 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                return commentPrefix;
        }
 
-       public void setCommentPrefix(byte[] commentPrefix) {
-               this.commentPrefix = commentPrefix;
-       }
-
-       public void setCommentPrefix(char commentPrefix) {
-               setCommentPrefix(String.valueOf(commentPrefix));
-       }
-
        public void setCommentPrefix(String commentPrefix) {
-               setCommentPrefix(commentPrefix, UTF_8_CHARSET);
+               setCommentPrefix(commentPrefix, charset);
        }
 
-       public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-               if (charsetName == null) {
-                       throw new IllegalArgumentException("Charset name must 
not be null");
-               }
-
-               if (commentPrefix != null) {
-                       Charset charset = Charset.forName(charsetName);
-                       setCommentPrefix(commentPrefix, charset);
-               } else {
-                       this.commentPrefix = null;
-               }
-       }
-
-       public void setCommentPrefix(String commentPrefix, Charset charset) {
+       private void setCommentPrefix(String commentPrefix, Charset charset) {
                if (charset == null) {
                        throw new IllegalArgumentException("Charset must not be 
null");
                }
@@ -174,7 +156,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        }
 
        public void setFieldDelimiter(String delimiter) {
-               this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
+               this.fieldDelim = delimiter.getBytes(charset);
        }
 
        public boolean isLenient() {
@@ -314,6 +296,25 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                this.fieldIncluded = includedMask;
        }
 
+       /**
+        * Gets the character set for the parser. Default is set to UTF-8.
+        *
+        * @return The charset for the parser.
+        */
+       Charset getCharset() {
+               return this.charset;
+       }
+
+       /**
+        * Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+        * when doing a parse.
+        *
+        * @param charset The character set to set.
+        */
+       public void setCharset(Charset charset) {
+               this.charset = Preconditions.checkNotNull(charset);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Runtime methods
        // 
--------------------------------------------------------------------------------------------
@@ -334,6 +335,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
 
                                FieldParser<?> p = 
InstantiationUtil.instantiate(parserType, FieldParser.class);
 
+                               p.setCharset(this.getCharset());
                                if (this.quotedStringParsing) {
                                        if (p instanceof StringParser) {
                                                
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
@@ -449,7 +451,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                        // search for ending quote character, continue when it 
is escaped
                        i++;
 
-                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i-1] == BACKSLASH)){
+                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i-1] == BACKSLASH)) {
                                i++;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 200d239..d9eeecc 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,11 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
@@ -34,6 +29,12 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * A FieldParser is used parse a field from a sequence of bytes. Fields occur 
in a byte sequence and are terminated
  * by the end of the byte sequence or a delimiter.
@@ -77,9 +78,11 @@ public abstract class FieldParser<T> {
                /** Invalid Boolean value **/
                BOOLEAN_INVALID
        }
-       
+
+       private Charset charset = Charset.forName("UTF-8");
+
        private ParseErrorState errorState = ParseErrorState.NONE;
-       
+
        /**
         * Parses the value of a field from the byte array, taking care of 
properly reset
         * the state of this parser.
@@ -217,7 +220,26 @@ public abstract class FieldParser<T> {
 
                return limitedLength;
        }
-       
+
+       /*
+        * Gets the Charset for the parser.Default is set to ASCII
+        *
+        * @return The charset for the parser.
+        */
+       public Charset getCharset() {
+               return this.charset;
+       }
+
+       /**
+        * Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
+        * when doing a parse.
+        *
+        * @param charset The charset  to set.
+        */
+       public void setCharset(Charset charset) {
+               this.charset = charset;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Mapping from types to parsers
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 1a2c7e3..7b46a7e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> {
                                // check for proper termination
                                if (i == limit) {
                                        // either by end of line
-                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2);
+                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2, getCharset());
                                        return limit;
                                } else if ( i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
                                        // or following field delimiter
-                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2);
+                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2, getCharset());
                                        return i + delimiter.length;
                                } else {
                                        // no proper termination
@@ -87,14 +87,14 @@ public class StringParser extends FieldParser<String> {
                                if (limit == startPos) {
                                        
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
                                }
-                               this.result = new String(bytes, startPos, limit 
- startPos);
+                               this.result = new String(bytes, startPos, limit 
- startPos, getCharset());
                                return limit;
                        } else {
                                // delimiter found.
                                if (i == startPos) {
                                        
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
                                }
-                               this.result = new String(bytes, startPos, i - 
startPos);
+                               this.result = new String(bytes, startPos, i - 
startPos, getCharset());
                                return i + delimiter.length;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index e3215c6..d063ddc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.GZIPOutputStream;
@@ -485,7 +486,7 @@ public class GenericCsvInputFormatTest {
                                fail("Input format accepted on invalid input.");
                        }
                        catch (ParseException e) {
-                               ; // all good
+                               // all good
                        }
                }
                catch (Exception ex) {
@@ -547,7 +548,38 @@ public class GenericCsvInputFormatTest {
                        fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
                }
        }
-       
+
+       @Test
+       public void testReadWithCharset() throws IOException {
+               try {
+                       final String fileContent = "\u00bf|Flink|\u00f1";
+                       final FileInputSplit split = 
createTempFile(fileContent);
+
+                       final Configuration parameters = new Configuration();
+
+                       format.setCharset(Charset.forName("UTF-8"));
+                       format.setFieldDelimiter("|");
+                       format.setFieldTypesGeneric(StringValue.class, 
StringValue.class, StringValue.class);
+
+                       format.configure(parameters);
+                       format.open(split);
+
+                       Value[] values = new Value[] { new StringValue(), new 
StringValue(), new StringValue()};
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals("\u00bf", ((StringValue) 
values[0]).getValue());
+                       assertEquals("Flink", ((StringValue) 
values[1]).getValue());
+                       assertEquals("\u00f1", ((StringValue) 
values[2]).getValue());
+
+                       assertNull(format.nextRecord(values));
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+       }
+
        @Test
        public void readWithEmptyField() {
                try {
@@ -722,7 +754,7 @@ public class GenericCsvInputFormatTest {
                return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
        }
        
-       private final Value[] createIntValues(int num) {
+       private Value[] createIntValues(int num) {
                Value[] v = new Value[num];
                
                for (int i = 0; i < num; i++) {
@@ -732,7 +764,7 @@ public class GenericCsvInputFormatTest {
                return v;
        }
        
-       private final Value[] createLongValues(int num) {
+       private Value[] createLongValues(int num) {
                Value[] v = new Value[num];
                
                for (int i = 0; i < num; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1fe8850..1c5579e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
+import java.nio.charset.Charset;
+
 public class VarLengthStringParserTest {
 
        public StringValueParser parser = new StringValueParser();
@@ -194,4 +196,22 @@ public class VarLengthStringParserTest {
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos < 0);
        }
+
+       @Test
+       public void testParseValidMixedStringsWithCharset() {
+
+               Charset charset = Charset.forName("US-ASCII");
+               this.parser = new StringValueParser();
+               this.parser.enableQuotedStringParsing((byte) '@');
+
+               // check valid strings with out whitespaces and trailing 
delimiter
+               byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+               StringValue s = new StringValue();
+
+               int startPos = 0;
+               parser.setCharset(charset);
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[]{'|'}, s);
+               assertTrue(startPos == 11);
+               assertTrue(s.getValue().equals("abcde|gh"));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 8be5dc2..b13b8aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 
@@ -64,6 +65,8 @@ public class CsvReader {
        protected boolean skipFirstLineAsHeader = false;
        
        protected boolean ignoreInvalidLines = false;
+
+       private Charset charset = Charset.forName("UTF-8");
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -157,7 +160,25 @@ public class CsvReader {
                this.commentPrefix = commentPrefix;
                return this;
        }
-       
+
+       /**
+        * Gets the character set for the reader. Default is set to UTF-8.
+        *
+        * @return The charset for the reader.
+        */
+       public Charset getCharset() {
+               return this.charset;
+       }
+
+       /**
+        * Sets the charset of the reader
+        *
+        * @param charset The character set to set.
+        */
+       public void setCharset(Charset charset) {
+               this.charset = Preconditions.checkNotNull(charset);
+       }
+
        /**
         * Configures which fields of the CSV file should be included and which 
should be skipped. The
         * parser will look at the first {@code n} fields, where {@code n} is 
the length of the boolean
@@ -340,6 +361,7 @@ public class CsvReader {
                format.setCommentPrefix(this.commentPrefix);
                format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
                format.setLenient(ignoreInvalidLines);
+               format.setCharset(this.charset);
                if (this.parseQuotedStrings) {
                        format.enableQuotedStringParsing(this.quoteCharacter);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 8b12315..e1c8023 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -75,6 +76,14 @@ public class CSVReaderTest {
                reader.ignoreComments("#");
                assertEquals("#", reader.commentPrefix);
        }
+
+       @Test
+       public void testCharset() {
+               CsvReader reader = getCsvReader();
+               assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
+               reader.setCharset(Charset.forName("US-ASCII"));
+               assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+       }
        
        @Test
        public void testIncludeFieldsDense() {

Reply via email to