[FLINK-3921] Add support to set encoding in CsvReader and StringParser.

- extends first commit.

This closes #2901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b

Branch: refs/heads/master
Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce
Parents: f2186af
Author: Greg Hogan <[email protected]>
Authored: Mon Nov 28 12:43:47 2016 -0500
Committer: Fabian Hueske <[email protected]>
Committed: Thu Dec 8 21:21:48 2016 +0100

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  67 ++++++++--
 .../api/common/io/GenericCsvInputFormat.java    |  79 +++++-------
 .../apache/flink/types/parser/FieldParser.java  |  18 ++-
 .../api/common/io/DelimitedInputFormatTest.java |  94 ++++++++++----
 .../common/io/GenericCsvInputFormatTest.java    | 125 ++++++++++---------
 .../types/parser/VarLengthStringParserTest.java |  12 +-
 .../org/apache/flink/api/java/io/CsvReader.java |  24 ++--
 .../apache/flink/api/java/io/CSVReaderTest.java |  23 ++--
 .../flink/api/java/io/CsvInputFormatTest.java   |   6 +-
 .../runtime/io/RowCsvInputFormatTest.scala      |   6 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala |  14 +--
 11 files changed, 273 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index fd02c82..5c8dfc1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,17 +20,18 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
         */
        private static final Logger LOG = 
LoggerFactory.getLogger(DelimitedInputFormat.class);
 
-       /** The default charset  to convert strings to bytes */
-       private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+       // The charset used to convert strings to bytes
+       private String charsetName = "UTF-8";
+
+       // Charset is not serializable
+       private transient Charset charset;
 
        /**
         * The default read buffer size = 1MB.
@@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
        // 
--------------------------------------------------------------------------------------------
        //  The configuration parameters. Configured on the instance and 
serialized to be shipped.
        // 
--------------------------------------------------------------------------------------------
-       
+
+       // The delimiter may be set with a byte-sequence or a String. In the 
latter
+       // case the byte representation is updated consistent with current 
charset.
        private byte[] delimiter = new byte[] {'\n'};
-       
+       private String delimiterString = null;
+
        private int lineLengthLimit = Integer.MAX_VALUE;
        
        private int bufferSize = -1;
@@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                }
                loadConfigParameters(configuration);
        }
-       
-       
+
+       /**
+        * Get the character set used for the row delimiter. This is also used 
by
+        * subclasses to interpret field delimiters, comment strings, and for
+        * configuring {@link FieldParser}s.
+        *
+        * @return the charset
+        */
+       @PublicEvolving
+       public Charset getCharset() {
+               if (this.charset == null) {
+                       this.charset = Charset.forName(charsetName);
+               }
+               return this.charset;
+       }
+
+       /**
+        * Set the name of the character set used for the row delimiter. This is
+        * also used by subclasses to interpret field delimiters, comment 
strings,
+        * and for configuring {@link FieldParser}s.
+        *
+        * These fields are interpreted when set. Changing the charset 
thereafter
+        * may cause unexpected results.
+        *
+        * @param charset name of the charset
+        */
+       @PublicEvolving
+       public void setCharset(String charset) {
+               this.charsetName = Preconditions.checkNotNull(charset);
+               this.charset = null;
+
+               if (this.delimiterString != null) {
+                       this.delimiter = delimiterString.getBytes(getCharset());
+               }
+       }
+
        public byte[] getDelimiter() {
                return delimiter;
        }
@@ -193,6 +234,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                        throw new IllegalArgumentException("Delimiter must not 
be null");
                }
                this.delimiter = delimiter;
+               this.delimiterString = null;
        }
 
        public void setDelimiter(char delimiter) {
@@ -203,7 +245,8 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                if (delimiter == null) {
                        throw new IllegalArgumentException("Delimiter must not 
be null");
                }
-               this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
+               this.delimiter = delimiter.getBytes(getCharset());
+               this.delimiterString = delimiter;
        }
        
        public int getLineLengthLimit() {
@@ -264,7 +307,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
        // 
--------------------------------------------------------------------------------------------
        
        /**
-        * Configures this input format by reading the path to the file from 
the configuration andge the string that
+        * Configures this input format by reading the path to the file from 
the configuration and the string that
         * defines the record delimiter.
         * 
         * @param parameters The configuration object to read the parameters 
from.

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 0ced22b..20c643e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,12 +25,10 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
@@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        
        private static final Logger LOG = 
LoggerFactory.getLogger(GenericCsvInputFormat.class);
 
-       /** The charset used to convert strings to bytes */
-       private Charset charset = Charset.forName("UTF-8");
-
        private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
        
        private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        private Class<?>[] fieldTypes = EMPTY_TYPES;
        
        protected boolean[] fieldIncluded = EMPTY_INCLUDED;
-               
+
+       // The byte representation of the delimiter is updated consistent with
+       // current charset.
        private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
-       
+       private String fieldDelimString = null;
+
        private boolean lenient;
        
        private boolean skipFirstLineAsHeader;
@@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
 
        private byte quoteCharacter;
 
+       // The byte representation of the comment prefix is updated consistent 
with
+       // current charset.
        protected byte[] commentPrefix = null;
+       private String commentPrefixString = null;
 
 
        // 
--------------------------------------------------------------------------------------------
@@ -105,11 +106,6 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                super(filePath, null);
        }
 
-       protected GenericCsvInputFormat(Path filePath, Charset charset) {
-               this(filePath);
-               this.charset = Preconditions.checkNotNull(charset);
-       }
-
        // 
--------------------------------------------------------------------------------------------
 
        public int getNumberOfFieldsTotal() {
@@ -120,43 +116,43 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                return this.fieldTypes.length;
        }
 
+       @Override
+       public void setCharset(String charset) {
+               super.setCharset(charset);
+
+               if (this.fieldDelimString != null) {
+                       this.fieldDelim = 
fieldDelimString.getBytes(getCharset());
+               }
+
+               if (this.commentPrefixString != null) {
+                       this.commentPrefix = 
commentPrefixString.getBytes(getCharset());
+               }
+       }
+
        public byte[] getCommentPrefix() {
                return commentPrefix;
        }
 
        public void setCommentPrefix(String commentPrefix) {
-               setCommentPrefix(commentPrefix, charset);
-       }
-
-       private void setCommentPrefix(String commentPrefix, Charset charset) {
-               if (charset == null) {
-                       throw new IllegalArgumentException("Charset must not be 
null");
-               }
                if (commentPrefix != null) {
-                       this.commentPrefix = commentPrefix.getBytes(charset);
+                       this.commentPrefix = 
commentPrefix.getBytes(getCharset());
                } else {
                        this.commentPrefix = null;
                }
+               this.commentPrefixString = commentPrefix;
        }
 
        public byte[] getFieldDelimiter() {
                return fieldDelim;
        }
 
-       public void setFieldDelimiter(byte[] delimiter) {
+       public void setFieldDelimiter(String delimiter) {
                if (delimiter == null) {
                        throw new IllegalArgumentException("Delimiter must not 
be null");
                }
 
-               this.fieldDelim = delimiter;
-       }
-
-       public void setFieldDelimiter(char delimiter) {
-               setFieldDelimiter(String.valueOf(delimiter));
-       }
-
-       public void setFieldDelimiter(String delimiter) {
-               this.fieldDelim = delimiter.getBytes(charset);
+               this.fieldDelim = delimiter.getBytes(getCharset());
+               this.fieldDelimString = delimiter;
        }
 
        public boolean isLenient() {
@@ -296,25 +292,6 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                this.fieldIncluded = includedMask;
        }
 
-       /**
-        * Gets the character set for the parser. Default is set to UTF-8.
-        *
-        * @return The charset for the parser.
-        */
-       Charset getCharset() {
-               return this.charset;
-       }
-
-       /**
-        * Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
-        * when doing a parse.
-        *
-        * @param charset The character set to set.
-        */
-       public void setCharset(Charset charset) {
-               this.charset = Preconditions.checkNotNull(charset);
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Runtime methods
        // 
--------------------------------------------------------------------------------------------
@@ -322,7 +299,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        @Override
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
-               
+
                // instantiate the parsers
                FieldParser<?>[] parsers = new 
FieldParser<?>[fieldTypes.length];
                
@@ -335,7 +312,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
 
                                FieldParser<?> p = 
InstantiationUtil.instantiate(parserType, FieldParser.class);
 
-                               p.setCharset(this.getCharset());
+                               p.setCharset(getCharset());
                                if (this.quotedStringParsing) {
                                        if (p instanceof StringParser) {
                                                
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index d9eeecc..cf3c83d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.StringValue;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -79,7 +80,7 @@ public abstract class FieldParser<T> {
                BOOLEAN_INVALID
        }
 
-       private Charset charset = Charset.forName("UTF-8");
+       private Charset charset = StandardCharsets.UTF_8;
 
        private ParseErrorState errorState = ParseErrorState.NONE;
 
@@ -105,9 +106,7 @@ public abstract class FieldParser<T> {
 
        /**
         * Each parser's logic should be implemented inside this method
-        *
-        * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)}
-        * */
+        */
        protected abstract int parseField(byte[] bytes, int startPos, int 
limit, byte[] delim, T reuse);
 
        /**
@@ -221,20 +220,19 @@ public abstract class FieldParser<T> {
                return limitedLength;
        }
 
-       /*
-        * Gets the Charset for the parser.Default is set to ASCII
+       /**
+        * Gets the character set used for this parser.
         *
-        * @return The charset for the parser.
+        * @return the charset used for this parser.
         */
        public Charset getCharset() {
                return this.charset;
        }
 
        /**
-        * Sets the charset of the parser. Called by subclasses of the parser 
to set the type of charset
-        * when doing a parse.
+        * Sets the character set used for this parser.
         *
-        * @param charset The charset  to set.
+        * @param charset charset used for this parser.
         */
        public void setCharset(Charset charset) {
                this.charset = charset;

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 93d5f9f..219365a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.common.io;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,17 +34,17 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class DelimitedInputFormatTest {
        
@@ -200,6 +201,45 @@ public class DelimitedInputFormatTest {
                }
        }
 
+       @Test
+       public void testReadCustomDelimiterWithCharset() throws IOException {
+               // Unicode row fragments
+               String[] records = new 
String[]{"\u020e\u021f\u05c0\u020b\u020f", "Apache", "\nFlink", "\u0000", 
"\u05c0"};
+
+               // Unicode delimiter
+               String delimiter = "\u05c0\u05c0";
+
+               String fileContent = StringUtils.join(records, delimiter);
+
+               for (final String charset : new String[]{ "UTF-8", "UTF-16BE", 
"UTF-16LE" }) {
+                       // use charset when instantiating the record String
+                       DelimitedInputFormat<String> format = new 
DelimitedInputFormat<String>() {
+                               @Override
+                               public String readRecord(String reuse, byte[] 
bytes, int offset, int numBytes) throws IOException {
+                                       return new String(bytes, offset, 
numBytes, charset);
+                               }
+                       };
+                       
format.setFilePath("file:///some/file/that/will/not/be/read");
+
+                       final FileInputSplit split = 
createTempFile(fileContent, charset);
+
+                       format.setDelimiter(delimiter);
+                       // use the same encoding to parse the file as used to 
read the file;
+                       // the delimiter is reinterpreted when the charset is 
set
+                       format.setCharset(charset);
+                       format.configure(new Configuration());
+                       format.open(split);
+
+                       for (String record : records) {
+                               String value = format.nextRecord(null);
+                               assertEquals(record, value);
+                       }
+
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+               }
+       }
+
        /**
         * Tests that the records are read correctly when the split boundary is 
in the middle of a record.
         */
@@ -363,19 +403,29 @@ public class DelimitedInputFormatTest {
                        fail(e.getMessage());
                }
        }
-       
-       private static FileInputSplit createTempFile(String contents) throws 
IOException {
+
+       static FileInputSplit createTempFile(String contents) throws 
IOException {
                File tempFile = File.createTempFile("test_contents", "tmp");
                tempFile.deleteOnExit();
-               
-               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-               wrt.write(contents);
-               wrt.close();
-               
+
+               try (Writer out = new OutputStreamWriter(new 
FileOutputStream(tempFile))) {
+                       out.write(contents);
+               }
+
                return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
        }
-       
-       
+
+       static FileInputSplit createTempFile(String contents, String charset) 
throws IOException {
+               File tempFile = File.createTempFile("test_contents", "tmp");
+               tempFile.deleteOnExit();
+
+               try (Writer out = new OutputStreamWriter(new 
FileOutputStream(tempFile), charset)) {
+                       out.write(contents);
+               }
+
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
+       }
+
        protected static final class MyTextInputFormat extends 
DelimitedInputFormat<String> {
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index d063ddc..c11a573 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -18,21 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.GZIPOutputStream;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -41,15 +27,29 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static 
org.apache.flink.api.common.io.DelimitedInputFormatTest.createTempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class GenericCsvInputFormatTest {
 
-       private File tempFile;
-       
        private TestCsvInputFormat format;
        
        // 
--------------------------------------------------------------------------------------------
@@ -65,9 +65,6 @@ public class GenericCsvInputFormatTest {
                if (this.format != null) {
                        this.format.close();
                }
-               if (this.tempFile != null) {
-                       this.tempFile.delete();
-               }
        }
        
        @Test
@@ -87,7 +84,7 @@ public class GenericCsvInputFormatTest {
        public void testReadNoPosAll() throws IOException {
                try {
                        final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
+                       final FileInputSplit split = 
createTempFile(fileContent);
                
                        final Configuration parameters = new Configuration();
                        
@@ -485,8 +482,7 @@ public class GenericCsvInputFormatTest {
                                format.nextRecord(values);
                                fail("Input format accepted on invalid input.");
                        }
-                       catch (ParseException e) {
-                               // all good
+                       catch (ParseException ignored) {
                        }
                }
                catch (Exception ex) {
@@ -551,40 +547,64 @@ public class GenericCsvInputFormatTest {
 
        @Test
        public void testReadWithCharset() throws IOException {
-               try {
-                       final String fileContent = "\u00bf|Flink|\u00f1";
-                       final FileInputSplit split = 
createTempFile(fileContent);
+               // Unicode row fragments
+               String[] records = new String[]{"\u020e\u021f", "Flink", 
"\u020b\u020f"};
 
-                       final Configuration parameters = new Configuration();
+               // Unicode delimiter
+               String delimiter = "\u05c0\u05c0";
 
-                       format.setCharset(Charset.forName("UTF-8"));
-                       format.setFieldDelimiter("|");
-                       format.setFieldTypesGeneric(StringValue.class, 
StringValue.class, StringValue.class);
+               String fileContent = StringUtils.join(records, delimiter);
 
-                       format.configure(parameters);
-                       format.open(split);
+               // StringValueParser does not use charset so rely on 
StringParser
+               GenericCsvInputFormat<String[]> format = new 
GenericCsvInputFormat<String[]>() {
+                       @Override
+                       public String[] readRecord(String[] target, byte[] 
bytes, int offset, int numBytes) throws IOException {
+                               return parseRecord(target, bytes, offset, 
numBytes) ? target : null;
+                       }
+               };
+               format.setFilePath("file:///some/file/that/will/not/be/read");
 
-                       Value[] values = new Value[] { new StringValue(), new 
StringValue(), new StringValue()};
+               for (String charset : new String[]{ "UTF-8", "UTF-16BE", 
"UTF-16LE" }) {
+                       File tempFile = File.createTempFile("test_contents", 
"tmp");
+                       tempFile.deleteOnExit();
 
+                       // write string with proper encoding
+                       try (Writer out = new OutputStreamWriter(new 
FileOutputStream(tempFile), charset)) {
+                               out.write(fileContent);
+                       }
+
+                       FileInputSplit split = new FileInputSplit(0, new 
Path(tempFile.toURI().toString()),
+                               0, tempFile.length(), new String[]{ "localhost" 
});
+
+                       format.setFieldDelimiter(delimiter);
+                       format.setFieldTypesGeneric(String.class, String.class, 
String.class);
+                       // use the same encoding to parse the file as used to 
read the file;
+                       // the field delimiter is reinterpreted when the 
charset is set
+                       format.setCharset(charset);
+                       format.configure(new Configuration());
+                       format.open(split);
+
+                       String[] values = new String[]{ "", "", "" };
                        values = format.nextRecord(values);
+
+                       // validate results
                        assertNotNull(values);
-                       assertEquals("\u00bf", ((StringValue) 
values[0]).getValue());
-                       assertEquals("Flink", ((StringValue) 
values[1]).getValue());
-                       assertEquals("\u00f1", ((StringValue) 
values[2]).getValue());
+                       for (int i = 0 ; i < records.length ; i++) {
+                               assertEquals(records[i], values[i]);
+                       }
 
                        assertNull(format.nextRecord(values));
                        assertTrue(format.reachedEnd());
                }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
+
+               format.close();
        }
 
        @Test
        public void readWithEmptyField() {
                try {
                        final String fileContent = 
"abc|def|ghijk\nabc||hhg\n|||";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
+                       final FileInputSplit split = 
createTempFile(fileContent);
                
                        final Configuration parameters = new Configuration();
 
@@ -721,37 +741,26 @@ public class GenericCsvInputFormatTest {
                }
        }
 
-       private FileInputSplit createTempFile(String content) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", "tmp");
-               this.tempFile.deleteOnExit();
-               
-               DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(tempFile));
-               dos.writeBytes(content);
-               dos.close();
-                       
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
-       }
-
        private FileInputSplit createTempDeflateFile(String content) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", 
"tmp.deflate");
-               this.tempFile.deleteOnExit();
+               File tempFile = File.createTempFile("test_contents", 
"tmp.deflate");
+               tempFile.deleteOnExit();
 
                DataOutputStream dos = new DataOutputStream(new 
DeflaterOutputStream(new FileOutputStream(tempFile)));
                dos.writeBytes(content);
                dos.close();
 
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
        }
 
        private FileInputSplit createTempGzipFile(String content) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", "tmp.gz");
-               this.tempFile.deleteOnExit();
+               File tempFile = File.createTempFile("test_contents", "tmp.gz");
+               tempFile.deleteOnExit();
 
                DataOutputStream dos = new DataOutputStream(new 
GZIPOutputStream(new FileOutputStream(tempFile)));
                dos.writeBytes(content);
                dos.close();
 
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
        }
        
        private Value[] createIntValues(int num) {

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1c5579e..718274e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,13 +19,15 @@
 
 package org.apache.flink.types.parser;
 
-import static org.junit.Assert.assertTrue;
-
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class VarLengthStringParserTest {
 
@@ -200,7 +202,7 @@ public class VarLengthStringParserTest {
        @Test
        public void testParseValidMixedStringsWithCharset() {
 
-               Charset charset = Charset.forName("US-ASCII");
+               Charset charset = StandardCharsets.US_ASCII;
                this.parser = new StringValueParser();
                this.parser.enableQuotedStringParsing((byte) '@');
 
@@ -211,7 +213,7 @@ public class VarLengthStringParserTest {
                int startPos = 0;
                parser.setCharset(charset);
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[]{'|'}, s);
-               assertTrue(startPos == 11);
-               assertTrue(s.getValue().equals("abcde|gh"));
+               assertEquals(11, startPos);
+               assertEquals("abcde|gh", s.getValue());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index b13b8aa..cbac386 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,24 +18,22 @@
 
 package org.apache.flink.api.java.io;
 
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
  * A builder class to instantiate a CSV parsing data source. The CSV reader 
configures the field types,
@@ -66,7 +64,7 @@ public class CsvReader {
        
        protected boolean ignoreInvalidLines = false;
 
-       private Charset charset = Charset.forName("UTF-8");
+       private String charset = "UTF-8";
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -162,11 +160,12 @@ public class CsvReader {
        }
 
        /**
-        * Gets the character set for the reader. Default is set to UTF-8.
+        * Gets the character set for the reader. Default is UTF-8.
         *
         * @return The charset for the reader.
         */
-       public Charset getCharset() {
+       @PublicEvolving
+       public String getCharset() {
                return this.charset;
        }
 
@@ -175,7 +174,8 @@ public class CsvReader {
         *
         * @param charset The character set to set.
         */
-       public void setCharset(Charset charset) {
+       @PublicEvolving
+       public void setCharset(String charset) {
                this.charset = Preconditions.checkNotNull(charset);
        }
 
@@ -356,12 +356,12 @@ public class CsvReader {
        // 
--------------------------------------------------------------------------------------------
        
        private void configureInputFormat(CsvInputFormat<?> format) {
+               format.setCharset(this.charset);
                format.setDelimiter(this.lineDelimiter);
                format.setFieldDelimiter(this.fieldDelimiter);
                format.setCommentPrefix(this.commentPrefix);
                format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
                format.setLenient(ignoreInvalidLines);
-               format.setCharset(this.charset);
                if (this.parseQuotedStrings) {
                        format.enableQuotedStringParsing(this.quoteCharacter);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index e1c8023..de57e5c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -18,15 +18,9 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -47,7 +41,12 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 /**
  * Tests for the CSV reader builder.
@@ -80,11 +79,11 @@ public class CSVReaderTest {
        @Test
        public void testCharset() {
                CsvReader reader = getCsvReader();
-               assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
-               reader.setCharset(Charset.forName("US-ASCII"));
-               assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+               assertEquals("UTF-8", reader.getCharset());
+               reader.setCharset("US-ASCII");
+               assertEquals("US-ASCII", reader.getCharset());
        }
-       
+
        @Test
        public void testIncludeFieldsDense() {
                CsvReader reader = getCsvReader();

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 54f226c..cc0d5bc 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -771,7 +771,7 @@ public class CsvInputFormatTest {
                final CsvInputFormat<Tuple5<Integer, String, String, String, 
Double>> format = new TupleCsvInputFormat<Tuple5<Integer, String, String, 
String, Double>>(PATH, typeInfo);
 
                format.setSkipFirstLineAsHeader(true);
-               format.setFieldDelimiter(',');
+               format.setFieldDelimiter(",");
 
                format.configure(new Configuration());
                format.open(split);
@@ -1077,7 +1077,7 @@ public class CsvInputFormatTest {
                CsvInputFormat<Tuple2<String, String>> inputFormat = new 
TupleCsvInputFormat<Tuple2<String, String>>(new 
Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true});
 
                inputFormat.enableQuotedStringParsing('"');
-               inputFormat.setFieldDelimiter('|');
+               inputFormat.setFieldDelimiter("|");
                inputFormat.setDelimiter('\n');
 
                inputFormat.configure(new Configuration());
@@ -1107,7 +1107,7 @@ public class CsvInputFormatTest {
                CsvInputFormat<Tuple2<String, String>> inputFormat = new 
TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
 
                inputFormat.enableQuotedStringParsing('"');
-               inputFormat.setFieldDelimiter('|');
+               inputFormat.setFieldDelimiter("|");
                inputFormat.setDelimiter('\n');
 
                inputFormat.configure(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index d176b79..d72e7a8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -656,7 +656,7 @@ class RowCsvInputFormatTest {
 
     val format = new RowCsvInputFormat(PATH, typeInfo)
     format.setSkipFirstLineAsHeader(true)
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(split)
 
@@ -745,7 +745,7 @@ class RowCsvInputFormatTest {
       rowTypeInfo = typeInfo,
       includedFieldsMask = Array(true, false, true))
     inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter('|')
+    inputFormat.setFieldDelimiter("|")
     inputFormat.setDelimiter('\n')
     inputFormat.configure(new Configuration)
 
@@ -776,7 +776,7 @@ class RowCsvInputFormatTest {
       new Path(tempFile.toURI.toString),
       rowTypeInfo = typeInfo)
     inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter('|')
+    inputFormat.setFieldDelimiter("|")
     inputFormat.setDelimiter('\n')
     inputFormat.configure(new Configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 24d86e7..539a257 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -56,7 +56,7 @@ class CsvInputFormatTest {
         createTypeInformation[(String, Integer, Double)]
           .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.setCommentPrefix("#")
       val parameters = new Configuration
       format.configure(parameters)
@@ -98,7 +98,7 @@ class CsvInputFormatTest {
         createTypeInformation[(String, Integer, Double)]
           .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.setCommentPrefix("//")
       val parameters = new Configuration
       format.configure(parameters)
@@ -443,7 +443,7 @@ class CsvInputFormatTest {
     val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -460,7 +460,7 @@ class CsvInputFormatTest {
     val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -477,7 +477,7 @@ class CsvInputFormatTest {
       PATH, typeInfo, Array("field2", "field1", "field3"))
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -495,7 +495,7 @@ class CsvInputFormatTest {
       Array(true, true, false, true, false))
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -511,7 +511,7 @@ class CsvInputFormatTest {
     val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 

Reply via email to