[FLINK-3921] Add support to set encoding in CsvReader and StringParser. - extends first commit.
This closes #2901. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b Branch: refs/heads/master Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce Parents: f2186af Author: Greg Hogan <[email protected]> Authored: Mon Nov 28 12:43:47 2016 -0500 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 8 21:21:48 2016 +0100 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 67 ++++++++-- .../api/common/io/GenericCsvInputFormat.java | 79 +++++------- .../apache/flink/types/parser/FieldParser.java | 18 ++- .../api/common/io/DelimitedInputFormatTest.java | 94 ++++++++++---- .../common/io/GenericCsvInputFormatTest.java | 125 ++++++++++--------- .../types/parser/VarLengthStringParserTest.java | 12 +- .../org/apache/flink/api/java/io/CsvReader.java | 24 ++-- .../apache/flink/api/java/io/CSVReaderTest.java | 23 ++-- .../flink/api/java/io/CsvInputFormatTest.java | 6 +- .../runtime/io/RowCsvInputFormatTest.scala | 6 +- .../flink/api/scala/io/CsvInputFormatTest.scala | 14 +-- 11 files changed, 273 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index fd02c82..5c8dfc1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -20,17 +20,18 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; @@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple */ private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class); - /** The default charset to convert strings to bytes */ - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); + // The charset used to convert strings to bytes + private String charsetName = "UTF-8"; + + // Charset is not serializable + private transient Charset charset; /** * The default read buffer size = 1MB. @@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple // -------------------------------------------------------------------------------------------- // The configuration parameters. Configured on the instance and serialized to be shipped. // -------------------------------------------------------------------------------------------- - + + // The delimiter may be set with a byte-sequence or a String. In the latter + // case the byte representation is updated consistent with current charset. private byte[] delimiter = new byte[] {'\n'}; - + private String delimiterString = null; + private int lineLengthLimit = Integer.MAX_VALUE; private int bufferSize = -1; @@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple } loadConfigParameters(configuration); } - - + + /** + * Get the character set used for the row delimiter. This is also used by + * subclasses to interpret field delimiters, comment strings, and for + * configuring {@link FieldParser}s. + * + * @return the charset + */ + @PublicEvolving + public Charset getCharset() { + if (this.charset == null) { + this.charset = Charset.forName(charsetName); + } + return this.charset; + } + + /** + * Set the name of the character set used for the row delimiter. This is + * also used by subclasses to interpret field delimiters, comment strings, + * and for configuring {@link FieldParser}s. + * + * These fields are interpreted when set. Changing the charset thereafter + * may cause unexpected results. + * + * @param charset name of the charset + */ + @PublicEvolving + public void setCharset(String charset) { + this.charsetName = Preconditions.checkNotNull(charset); + this.charset = null; + + if (this.delimiterString != null) { + this.delimiter = delimiterString.getBytes(getCharset()); + } + } + public byte[] getDelimiter() { return delimiter; } @@ -193,6 +234,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple throw new IllegalArgumentException("Delimiter must not be null"); } this.delimiter = delimiter; + this.delimiterString = null; } public void setDelimiter(char delimiter) { @@ -203,7 +245,8 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple if (delimiter == null) { throw new IllegalArgumentException("Delimiter must not be null"); } - this.delimiter = delimiter.getBytes(UTF_8_CHARSET); + this.delimiter = delimiter.getBytes(getCharset()); + this.delimiterString = delimiter; } public int getLineLengthLimit() { @@ -264,7 +307,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple // -------------------------------------------------------------------------------------------- /** - * Configures this input format by reading the path to the file from the configuration andge the string that + * Configures this input format by reading the path to the file from the configuration and the string that * defines the record delimiter. * * @param parameters The configuration object to read the parameters from. http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 0ced22b..20c643e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -25,12 +25,10 @@ import org.apache.flink.types.parser.FieldParser; import org.apache.flink.types.parser.StringParser; import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Map; import java.util.TreeMap; @@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class); - /** The charset used to convert strings to bytes */ - private Charset charset = Charset.forName("UTF-8"); - private static final Class<?>[] EMPTY_TYPES = new Class<?>[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0]; @@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private Class<?>[] fieldTypes = EMPTY_TYPES; protected boolean[] fieldIncluded = EMPTY_INCLUDED; - + + // The byte representation of the delimiter is updated consistent with + // current charset. private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER; - + private String fieldDelimString = null; + private boolean lenient; private boolean skipFirstLineAsHeader; @@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private byte quoteCharacter; + // The byte representation of the comment prefix is updated consistent with + // current charset. protected byte[] commentPrefix = null; + private String commentPrefixString = null; // -------------------------------------------------------------------------------------------- @@ -105,11 +106,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> super(filePath, null); } - protected GenericCsvInputFormat(Path filePath, Charset charset) { - this(filePath); - this.charset = Preconditions.checkNotNull(charset); - } - // -------------------------------------------------------------------------------------------- public int getNumberOfFieldsTotal() { @@ -120,43 +116,43 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> return this.fieldTypes.length; } + @Override + public void setCharset(String charset) { + super.setCharset(charset); + + if (this.fieldDelimString != null) { + this.fieldDelim = fieldDelimString.getBytes(getCharset()); + } + + if (this.commentPrefixString != null) { + this.commentPrefix = commentPrefixString.getBytes(getCharset()); + } + } + public byte[] getCommentPrefix() { return commentPrefix; } public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, charset); - } - - private void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException("Charset must not be null"); - } if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); + this.commentPrefix = commentPrefix.getBytes(getCharset()); } else { this.commentPrefix = null; } + this.commentPrefixString = commentPrefix; } public byte[] getFieldDelimiter() { return fieldDelim; } - public void setFieldDelimiter(byte[] delimiter) { + public void setFieldDelimiter(String delimiter) { if (delimiter == null) { throw new IllegalArgumentException("Delimiter must not be null"); } - this.fieldDelim = delimiter; - } - - public void setFieldDelimiter(char delimiter) { - setFieldDelimiter(String.valueOf(delimiter)); - } - - public void setFieldDelimiter(String delimiter) { - this.fieldDelim = delimiter.getBytes(charset); + this.fieldDelim = delimiter.getBytes(getCharset()); + this.fieldDelimString = delimiter; } public boolean isLenient() { @@ -296,25 +292,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> this.fieldIncluded = includedMask; } - /** - * Gets the character set for the parser. Default is set to UTF-8. - * - * @return The charset for the parser. - */ - Charset getCharset() { - return this.charset; - } - - /** - * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset - * when doing a parse. - * - * @param charset The character set to set. - */ - public void setCharset(Charset charset) { - this.charset = Preconditions.checkNotNull(charset); - } - // -------------------------------------------------------------------------------------------- // Runtime methods // -------------------------------------------------------------------------------------------- @@ -322,7 +299,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> @Override public void open(FileInputSplit split) throws IOException { super.open(split); - + // instantiate the parsers FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length]; @@ -335,7 +312,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class); - p.setCharset(this.getCharset()); + p.setCharset(getCharset()); if (this.quotedStringParsing) { if (p instanceof StringParser) { ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index d9eeecc..cf3c83d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -32,6 +32,7 @@ import org.apache.flink.types.StringValue; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -79,7 +80,7 @@ public abstract class FieldParser<T> { BOOLEAN_INVALID } - private Charset charset = Charset.forName("UTF-8"); + private Charset charset = StandardCharsets.UTF_8; private ParseErrorState errorState = ParseErrorState.NONE; @@ -105,9 +106,7 @@ public abstract class FieldParser<T> { /** * Each parser's logic should be implemented inside this method - * - * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)} - * */ + */ protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); /** @@ -221,20 +220,19 @@ public abstract class FieldParser<T> { return limitedLength; } - /* - * Gets the Charset for the parser.Default is set to ASCII + /** + * Gets the character set used for this parser. * - * @return The charset for the parser. + * @return the charset used for this parser. */ public Charset getCharset() { return this.charset; } /** - * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset - * when doing a parse. + * Sets the character set used for this parser. * - * @param charset The charset to set. + * @param charset charset used for this parser. */ public void setCharset(Charset charset) { this.charset = charset; http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 93d5f9f..219365a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -18,12 +18,13 @@ package org.apache.flink.api.common.io; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -33,17 +34,17 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStreamWriter; +import java.io.Writer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DelimitedInputFormatTest { @@ -200,6 +201,45 @@ public class DelimitedInputFormatTest { } } + @Test + public void testReadCustomDelimiterWithCharset() throws IOException { + // Unicode row fragments + String[] records = new String[]{"\u020e\u021f\u05c0\u020b\u020f", "Apache", "\nFlink", "\u0000", "\u05c0"}; + + // Unicode delimiter + String delimiter = "\u05c0\u05c0"; + + String fileContent = StringUtils.join(records, delimiter); + + for (final String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) { + // use charset when instantiating the record String + DelimitedInputFormat<String> format = new DelimitedInputFormat<String>() { + @Override + public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) throws IOException { + return new String(bytes, offset, numBytes, charset); + } + }; + format.setFilePath("file:///some/file/that/will/not/be/read"); + + final FileInputSplit split = createTempFile(fileContent, charset); + + format.setDelimiter(delimiter); + // use the same encoding to parse the file as used to read the file; + // the delimiter is reinterpreted when the charset is set + format.setCharset(charset); + format.configure(new Configuration()); + format.open(split); + + for (String record : records) { + String value = format.nextRecord(null); + assertEquals(record, value); + } + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + } + /** * Tests that the records are read correctly when the split boundary is in the middle of a record. */ @@ -363,19 +403,29 @@ public class DelimitedInputFormatTest { fail(e.getMessage()); } } - - private static FileInputSplit createTempFile(String contents) throws IOException { + + static FileInputSplit createTempFile(String contents) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(contents); - wrt.close(); - + + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile))) { + out.write(contents); + } + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } - - + + static FileInputSplit createTempFile(String contents, String charset) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) { + out.write(contents); + } + + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); + } + protected static final class MyTextInputFormat extends DelimitedInputFormat<String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index d063ddc..c11a573 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -18,21 +18,7 @@ package org.apache.flink.api.common.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.zip.DeflaterOutputStream; -import java.util.zip.GZIPOutputStream; - +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; @@ -41,15 +27,29 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Arrays; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPOutputStream; + +import static org.apache.flink.api.common.io.DelimitedInputFormatTest.createTempFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class GenericCsvInputFormatTest { - private File tempFile; - private TestCsvInputFormat format; // -------------------------------------------------------------------------------------------- @@ -65,9 +65,6 @@ public class GenericCsvInputFormatTest { if (this.format != null) { this.format.close(); } - if (this.tempFile != null) { - this.tempFile.delete(); - } } @Test @@ -87,7 +84,7 @@ public class GenericCsvInputFormatTest { public void testReadNoPosAll() throws IOException { try { final String fileContent = "111|222|333|444|555\n666|777|888|999|000|"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); @@ -485,8 +482,7 @@ public class GenericCsvInputFormatTest { format.nextRecord(values); fail("Input format accepted on invalid input."); } - catch (ParseException e) { - // all good + catch (ParseException ignored) { } } catch (Exception ex) { @@ -551,40 +547,64 @@ public class GenericCsvInputFormatTest { @Test public void testReadWithCharset() throws IOException { - try { - final String fileContent = "\u00bf|Flink|\u00f1"; - final FileInputSplit split = createTempFile(fileContent); + // Unicode row fragments + String[] records = new String[]{"\u020e\u021f", "Flink", "\u020b\u020f"}; - final Configuration parameters = new Configuration(); + // Unicode delimiter + String delimiter = "\u05c0\u05c0"; - format.setCharset(Charset.forName("UTF-8")); - format.setFieldDelimiter("|"); - format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class); + String fileContent = StringUtils.join(records, delimiter); - format.configure(parameters); - format.open(split); + // StringValueParser does not use charset so rely on StringParser + GenericCsvInputFormat<String[]> format = new GenericCsvInputFormat<String[]>() { + @Override + public String[] readRecord(String[] target, byte[] bytes, int offset, int numBytes) throws IOException { + return parseRecord(target, bytes, offset, numBytes) ? target : null; + } + }; + format.setFilePath("file:///some/file/that/will/not/be/read"); - Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()}; + for (String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + // write string with proper encoding + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) { + out.write(fileContent); + } + + FileInputSplit split = new FileInputSplit(0, new Path(tempFile.toURI().toString()), + 0, tempFile.length(), new String[]{ "localhost" }); + + format.setFieldDelimiter(delimiter); + format.setFieldTypesGeneric(String.class, String.class, String.class); + // use the same encoding to parse the file as used to read the file; + // the field delimiter is reinterpreted when the charset is set + format.setCharset(charset); + format.configure(new Configuration()); + format.open(split); + + String[] values = new String[]{ "", "", "" }; values = format.nextRecord(values); + + // validate results assertNotNull(values); - assertEquals("\u00bf", ((StringValue) values[0]).getValue()); - assertEquals("Flink", ((StringValue) values[1]).getValue()); - assertEquals("\u00f1", ((StringValue) values[2]).getValue()); + for (int i = 0 ; i < records.length ; i++) { + assertEquals(records[i], values[i]); + } assertNull(format.nextRecord(values)); assertTrue(format.reachedEnd()); } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } + + format.close(); } @Test public void readWithEmptyField() { try { final String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); @@ -721,37 +741,26 @@ public class GenericCsvInputFormatTest { } } - private FileInputSplit createTempFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp"); - this.tempFile.deleteOnExit(); - - DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile)); - dos.writeBytes(content); - dos.close(); - - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); - } - private FileInputSplit createTempDeflateFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp.deflate"); - this.tempFile.deleteOnExit(); + File tempFile = File.createTempFile("test_contents", "tmp.deflate"); + tempFile.deleteOnExit(); DataOutputStream dos = new DataOutputStream(new DeflaterOutputStream(new FileOutputStream(tempFile))); dos.writeBytes(content); dos.close(); - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } private FileInputSplit createTempGzipFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp.gz"); - this.tempFile.deleteOnExit(); + File tempFile = File.createTempFile("test_contents", "tmp.gz"); + tempFile.deleteOnExit(); DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(tempFile))); dos.writeBytes(content); dos.close(); - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } private Value[] createIntValues(int num) { http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 1c5579e..718274e 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -19,13 +19,15 @@ package org.apache.flink.types.parser; -import static org.junit.Assert.assertTrue; - import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; import org.junit.Test; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class VarLengthStringParserTest { @@ -200,7 +202,7 @@ public class VarLengthStringParserTest { @Test public void testParseValidMixedStringsWithCharset() { - Charset charset = Charset.forName("US-ASCII"); + Charset charset = StandardCharsets.US_ASCII; this.parser = new StringValueParser(); this.parser.enableQuotedStringParsing((byte) '@'); @@ -211,7 +213,7 @@ public class VarLengthStringParserTest { int startPos = 0; parser.setCharset(charset); startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s); - assertTrue(startPos == 11); - assertTrue(s.getValue().equals("abcde|gh")); + assertEquals(11, startPos); + assertEquals("abcde|gh", s.getValue()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index b13b8aa..cbac386 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -18,24 +18,22 @@ package org.apache.flink.api.java.io; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; - import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator +import org.apache.flink.api.java.tuple.*; +//CHECKSTYLE.ON: AvoidStarImport import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Preconditions; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -import org.apache.flink.api.java.tuple.*; -//CHECKSTYLE.ON: AvoidStarImport +import java.util.ArrayList; +import java.util.Arrays; /** * A builder class to instantiate a CSV parsing data source. The CSV reader configures the field types, @@ -66,7 +64,7 @@ public class CsvReader { protected boolean ignoreInvalidLines = false; - private Charset charset = Charset.forName("UTF-8"); + private String charset = "UTF-8"; // -------------------------------------------------------------------------------------------- @@ -162,11 +160,12 @@ public class CsvReader { } /** - * Gets the character set for the reader. Default is set to UTF-8. + * Gets the character set for the reader. Default is UTF-8. * * @return The charset for the reader. */ - public Charset getCharset() { + @PublicEvolving + public String getCharset() { return this.charset; } @@ -175,7 +174,8 @@ public class CsvReader { * * @param charset The character set to set. */ - public void setCharset(Charset charset) { + @PublicEvolving + public void setCharset(String charset) { this.charset = Preconditions.checkNotNull(charset); } @@ -356,12 +356,12 @@ public class CsvReader { // -------------------------------------------------------------------------------------------- private void configureInputFormat(CsvInputFormat<?> format) { + format.setCharset(this.charset); format.setDelimiter(this.lineDelimiter); format.setFieldDelimiter(this.fieldDelimiter); format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); format.setLenient(ignoreInvalidLines); - format.setCharset(this.charset); if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index e1c8023..de57e5c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -18,15 +18,9 @@ package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Arrays; - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; @@ -47,7 +41,12 @@ import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; -import org.apache.flink.api.java.ExecutionEnvironment; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * Tests for the CSV reader builder. @@ -80,11 +79,11 @@ public class CSVReaderTest { @Test public void testCharset() { CsvReader reader = getCsvReader(); - assertEquals(reader.getCharset(), Charset.forName("UTF-8")); - reader.setCharset(Charset.forName("US-ASCII")); - assertEquals(reader.getCharset(), Charset.forName("US-ASCII")); + assertEquals("UTF-8", reader.getCharset()); + reader.setCharset("US-ASCII"); + assertEquals("US-ASCII", reader.getCharset()); } - + @Test public void testIncludeFieldsDense() { CsvReader reader = getCsvReader(); http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 54f226c..cc0d5bc 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -771,7 +771,7 @@ public class CsvInputFormatTest { final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new TupleCsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo); format.setSkipFirstLineAsHeader(true); - format.setFieldDelimiter(','); + format.setFieldDelimiter(","); format.configure(new Configuration()); format.open(split); @@ -1077,7 +1077,7 @@ public class CsvInputFormatTest { CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true}); inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); + inputFormat.setFieldDelimiter("|"); inputFormat.setDelimiter('\n'); inputFormat.configure(new Configuration()); @@ -1107,7 +1107,7 @@ public class CsvInputFormatTest { CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); + inputFormat.setFieldDelimiter("|"); inputFormat.setDelimiter('\n'); inputFormat.configure(new Configuration()); http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index d176b79..d72e7a8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -656,7 +656,7 @@ class RowCsvInputFormatTest { val format = new RowCsvInputFormat(PATH, typeInfo) format.setSkipFirstLineAsHeader(true) - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(split) @@ -745,7 +745,7 @@ class RowCsvInputFormatTest { rowTypeInfo = typeInfo, includedFieldsMask = Array(true, false, true)) inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter('|') + inputFormat.setFieldDelimiter("|") inputFormat.setDelimiter('\n') inputFormat.configure(new Configuration) @@ -776,7 +776,7 @@ class RowCsvInputFormatTest { new Path(tempFile.toURI.toString), rowTypeInfo = typeInfo) inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter('|') + inputFormat.setFieldDelimiter("|") inputFormat.setDelimiter('\n') inputFormat.configure(new Configuration) http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index 24d86e7..539a257 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -56,7 +56,7 @@ class CsvInputFormatTest { createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.setCommentPrefix("#") val parameters = new Configuration format.configure(parameters) @@ -98,7 +98,7 @@ class CsvInputFormatTest { createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.setCommentPrefix("//") val parameters = new Configuration format.configure(parameters) @@ -443,7 +443,7 @@ class CsvInputFormatTest { val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -460,7 +460,7 @@ class CsvInputFormatTest { val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -477,7 +477,7 @@ class CsvInputFormatTest { PATH, typeInfo, Array("field2", "field1", "field3")) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -495,7 +495,7 @@ class CsvInputFormatTest { Array(true, true, false, true, false)) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -511,7 +511,7 @@ class CsvInputFormatTest { val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile)
