http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java new file mode 100644 index 0000000..a68e81e --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class RowCsvInputFormatTest { + + private static Path PATH = new Path("an/ignored/file/"); + + // static variables for testing the removal of \r\n to \n + private static String FIRST_PART = "That is the first part"; + private static String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() throws Exception { + String fileContent = + "#description of the data\n" + + "header1|header2|header3|\n" + + "this is|1|2.0|\n" + + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.setLenient(false); + Configuration parameters = new Configuration(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Row too short)"); + } catch (ParseException ignored) { + } // => ok + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ignored) { + } // => ok + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Row too short)"); + } catch (ParseException ignored) { + } // => ok + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#next", result.getField(0)); + assertEquals(5, result.getField(1)); + assertEquals(6.0, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + + // re-open with lenient = true + format.setLenient(true); + format.configure(parameters); + format.open(split); + + result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("header1", result.getField(0)); + assertNull(result.getField(1)); + assertNull(result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#next", result.getField(0)); + assertEquals(5, result.getField(1)); + assertEquals(6.0, result.getField(2)); + result = format.nextRecord(result); + assertNull(result); + } + + @Test + public void ignoreSingleCharPrefixComments() throws Exception { + String fileContent = + "#description of the data\n" + + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.setCommentPrefix("#"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + } + + @Test + public void ignoreMultiCharPrefixComments() throws Exception { + String fileContent = + "//description of the data\n" + + "//successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "//next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.setCommentPrefix("//"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + result = format.nextRecord(result); + assertNull(result); + } + + @Test + public void readStringFields() throws Exception { + String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("hhg", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void readMixedQuotedStringFields() throws Exception { + String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.configure(new Configuration()); + format.enableQuotedStringParsing('@'); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a|b|c", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("|hhg", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void readStringFieldsWithTrailingDelimiters() throws Exception { + String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + format.setFieldDelimiter("|-"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("hhg", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testIntegerFields() throws Exception { + String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|"); + + format.setFieldDelimiter("|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(5); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(222, result.getField(1)); + assertEquals(333, result.getField(2)); + assertEquals(444, result.getField(3)); + assertEquals(555, result.getField(4)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(666, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(888, result.getField(2)); + assertEquals(999, result.getField(3)); + assertEquals(0, result.getField(4)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testEmptyFields() throws Exception { + String fileContent = + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, true); + format.setFieldDelimiter(","); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(8); + int linesCnt = fileContent.split("\n").length; + + for (int i = 0; i < linesCnt; i++) { + result = format.nextRecord(result); + assertNull(result.getField(i)); + } + + // ensure no more rows + assertNull(format.nextRecord(result)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testDoubleFields() throws Exception { + String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); + format.setFieldDelimiter("|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(5); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(11.1, result.getField(0)); + assertEquals(22.2, result.getField(1)); + assertEquals(33.3, result.getField(2)); + assertEquals(44.4, result.getField(3)); + assertEquals(55.5, result.getField(4)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(66.6, result.getField(0)); + assertEquals(77.7, result.getField(1)); + assertEquals(88.8, result.getField(2)); + assertEquals(99.9, result.getField(3)); + assertEquals(0.0, result.getField(4)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadFirstN() throws Exception { + String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); + format.setFieldDelimiter("|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(222, result.getField(1)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(666, result.getField(0)); + assertEquals(777, result.getField(1)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadSparseWithNullFieldsForTypes() throws Exception { + String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + + "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat( + PATH, + typeInfo, + new boolean[]{true, false, false, true, false, false, false, true}); + format.setFieldDelimiter("|x|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(444, result.getField(1)); + assertEquals(888, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(0, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(333, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadSparseWithPositionSetter() throws Exception { + String fileContent = "111|222|333|444|555|666|777|888|999|000|\n" + + "000|999|888|777|666|555|444|333|222|111|"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat( + PATH, + typeInfo, + new int[]{0, 3, 7}); + format.setFieldDelimiter("|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + result = format.nextRecord(result); + + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(444, result.getField(1)); + assertEquals(888, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(0, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(333, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadSparseWithMask() throws Exception { + String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + + "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"; + + FileInputSplit split = RowCsvInputFormatTest.createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat( + PATH, + typeInfo, + new boolean[]{true, false, false, true, false, false, false, true}); + format.setFieldDelimiter("&&"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(444, result.getField(1)); + assertEquals(888, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(0, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(333, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testParseStringErrors() throws Exception { + StringParser stringParser = new StringParser(); + + stringParser.enableQuotedStringParsing((byte) '"'); + + Map<String, StringParser.ParseErrorState> failures = new HashMap<>(); + failures.put("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); + failures.put("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING); + + for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) { + int result = stringParser.parseField( + failure.getKey().getBytes(), + 0, + failure.getKey().length(), + new byte[]{(byte) '|'}, + null); + assertEquals(-1, result); + assertEquals(failure.getValue(), stringParser.getErrorState()); + } + } + + // Test disabled because we do not support double-quote escaped quotes right now. + @Test + @Ignore + public void testParserCorrectness() throws Exception { + // RFC 4180 Compliance Test content + // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example + String fileContent = "Year,Make,Model,Description,Price\n" + + "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" + + "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" + + ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); + format.setSkipFirstLineAsHeader(true); + format.setFieldDelimiter(","); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(5); + Row r1 = new Row(5); + r1.setField(0, 1997); + r1.setField(1, "Ford"); + r1.setField(2, "E350"); + r1.setField(3, "ac, abs, moon"); + r1.setField(4, 3000.0); + + Row r2 = new Row(5); + r2.setField(0, 1999); + r2.setField(1, "Chevy"); + r2.setField(2, "Venture \"Extended Edition\""); + r2.setField(3, ""); + r2.setField(4, 4900.0); + + Row r3 = new Row(5); + r3.setField(0, 1996); + r3.setField(1, "Jeep"); + r3.setField(2, "Grand Cherokee"); + r3.setField(3, "MUST SELL! air, moon roof, loaded"); + r3.setField(4, 4799.0); + + Row r4 = new Row(5); + r4.setField(0, 1999); + r4.setField(1, "Chevy"); + r4.setField(2, "Venture \"Extended Edition, Very Large\""); + r4.setField(3, ""); + r4.setField(4, 5000.0); + + Row r5 = new Row(5); + r5.setField(0, 0); + r5.setField(1, ""); + r5.setField(2, "Venture \"Extended Edition\""); + r5.setField(3, ""); + r5.setField(4, 4900.0); + + Row[] expectedLines = new Row[]{r1, r2, r3, r4, r5}; + for (Row expected : expectedLines) { + result = format.nextRecord(result); + assertEquals(expected, result); + } + assertNull(format.nextRecord(result)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testWindowsLineEndRemoval() throws Exception { + + // check typical use case -- linux file is correct and it is set up to linux(\n) + testRemovingTrailingCR("\n", "\n"); + + // check typical windows case -- windows file endings and file has windows file endings set up + testRemovingTrailingCR("\r\n", "\r\n"); + + // check problematic case windows file -- windows file endings(\r\n) + // but linux line endings (\n) set up + testRemovingTrailingCR("\r\n", "\n"); + + // check problematic case linux file -- linux file endings (\n) + // but windows file endings set up (\r\n) + // specific setup for windows line endings will expect \r\n because + // it has to be set up and is not standard. + } + + @Test + public void testQuotedStringParsingWithIncludeFields() throws Exception { + String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" + + "\"Blahblah <[email protected]>\"|\"blaaa|\"blubb\""; + File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); + writer.write(fileContent); + writer.close(); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat inputFormat = new RowCsvInputFormat( + new Path(tempFile.toURI().toString()), + typeInfo, + new boolean[]{true, false, true}); + inputFormat.enableQuotedStringParsing('"'); + inputFormat.setFieldDelimiter("|"); + inputFormat.setDelimiter('\n'); + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row record = inputFormat.nextRecord(new Row(2)); + assertEquals("20:41:52-1-3-2015", record.getField(0)); + assertEquals("Blahblah <[email protected]>", record.getField(1)); + } + + @Test + public void testQuotedStringParsingWithEscapedQuotes() throws Exception { + String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""; + File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); + writer.write(fileContent); + writer.close(); + + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.enableQuotedStringParsing('"'); + inputFormat.setFieldDelimiter("|"); + inputFormat.setDelimiter('\n'); + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row record = inputFormat.nextRecord(new Row(2)); + assertEquals("\\\"Hello\\\" World", record.getField(0)); + assertEquals("We are\\\" young", record.getField(1)); + } + + @Test + public void testSqlTimeFields() throws Exception { + String fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" + + "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo( + SqlTimeTypeInfo.DATE, + SqlTimeTypeInfo.TIME, + SqlTimeTypeInfo.TIMESTAMP, + SqlTimeTypeInfo.TIMESTAMP); + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo); + format.setFieldDelimiter("|"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(4); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Date.valueOf("1990-10-14"), result.getField(0)); + assertEquals(Time.valueOf("02:42:25"), result.getField(1)); + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2)); + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.getField(3)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Date.valueOf("1990-10-14"), result.getField(0)); + assertEquals(Time.valueOf("02:42:25"), result.getField(1)); + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2)); + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.getField(3)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + private static FileInputSplit createTempFile(String content) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); + wrt.write(content); + wrt.close(); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[]{"localhost"}); + } + + private static void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) throws IOException { + String fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile; + + // create input file + File tempFile = File.createTempFile("CsvInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(fileContent); + wrt.close(); + + RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO); + + RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.configure(new Configuration()); + inputFormat.setDelimiter(lineBreakerSetup); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row result = inputFormat.nextRecord(new Row(1)); + assertNotNull("Expecting to not return null", result); + assertEquals(FIRST_PART, result.getField(0)); + + result = inputFormat.nextRecord(result); + assertNotNull("Expecting to not return null", result); + assertEquals(SECOND_PART, result.getField(0)); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala index 3517338..0f748c5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala @@ -133,7 +133,7 @@ class BatchTableEnvironment( * Converts the given [[Table]] into a [[DataSet]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. * @@ -150,7 +150,7 @@ class BatchTableEnvironment( * Converts the given [[Table]] into a [[DataSet]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. * http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala index 83293e3..3218ced 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala @@ -135,7 +135,7 @@ class StreamTableEnvironment( * Converts the given [[Table]] into a [[DataStream]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * @@ -152,7 +152,7 @@ class StreamTableEnvironment( * Converts the given [[Table]] into a [[DataStream]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala index f4bfe31..26fe51e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala @@ -128,7 +128,7 @@ class BatchTableEnvironment( * Converts the given [[Table]] into a [[DataSet]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. * http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala index dde69d5..044ace8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala @@ -131,7 +131,7 @@ class StreamTableEnvironment( * Converts the given [[Table]] into a [[DataStream]] of a specified type. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index 3bce5cf..1e8bf39 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -18,7 +18,8 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.{Row, Table} +import org.apache.flink.api.table.Table +import org.apache.flink.types.Row import scala.language.implicitConversions import org.apache.flink.streaming.api.scala._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 918b01b..6d00ab3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -37,6 +37,7 @@ import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable} import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.api.table.sources.BatchTableSource +import org.apache.flink.types.Row /** * The abstract base class for batch TableEnvironments. @@ -168,7 +169,7 @@ abstract class BatchTableEnvironment( private[flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) + val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row])) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 8f00586..da20e07 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -37,6 +37,7 @@ import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.api.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.types.Row /** * The base class for stream TableEnvironments. http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index b6d0e31..07ea860 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -32,15 +32,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.api.table.codegen.ExpressionReducer import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} -import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createTableSqlFunctions, createScalarSqlFunction} -import org.apache.flink.api.table.functions.{TableFunction, ScalarFunction} +import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} +import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.api.table.plan.cost.DataSetCostFactory import org.apache.flink.api.table.plan.schema.RelTable import org.apache.flink.api.table.sinks.TableSink @@ -347,6 +347,7 @@ abstract class TableEnvironment(val config: TableConfig) { case t: TupleTypeInfo[A] => t.getFieldNames case c: CaseClassTypeInfo[A] => c.getFieldNames case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames case tpe => throw new TableException(s"Type $tpe lacks explicit field naming") } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala index 4092a24..a706309 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala @@ -28,7 +28,8 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, RowTypeInfo, TypeCheckUtils} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils} object CodeGenUtils { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 7caad12..cdb3753 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -37,7 +37,8 @@ import org.apache.flink.api.table.codegen.Indenter.toISC import org.apache.flink.api.table.codegen.calls.FunctionGenerator import org.apache.flink.api.table.codegen.calls.ScalarOperators._ import org.apache.flink.api.table.functions.UserDefinedFunction -import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter import org.apache.flink.api.table.typeutils.TypeCheckUtils._ import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} @@ -1139,7 +1140,7 @@ class CodeGenerator( case ProductAccessor(i) => // Object - val inputCode = s"($fieldTypeTerm) $inputTerm.productElement($i)" + val inputCode = s"($fieldTypeTerm) $inputTerm.getField($i)" generateInputFieldUnboxing(fieldType, inputCode) case ObjectPrivateFieldAccessor(field) => http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala index 731452f..871264e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala @@ -25,8 +25,10 @@ import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter} -import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.types.Row +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} import scala.collection.JavaConverters._ @@ -69,7 +71,7 @@ class ExpressionReducer(config: TableConfig) } val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType)) - val resultType = new RowTypeInfo(literalTypes) + val resultType = new RowTypeInfo(literalTypes: _*) // generate MapFunction val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO) @@ -105,7 +107,7 @@ class ExpressionReducer(config: TableConfig) reducedValues.add(unreduced) case _ => val literal = rexBuilder.makeLiteral( - reduced.productElement(reducedIdx), + reduced.getField(reducedIdx), unreduced.getType, true) reducedValues.add(literal) http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index e85ade0..94513d9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -28,8 +28,10 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.plan.nodes.FlinkAggregate import org.apache.flink.api.table.runtime.aggregate.AggregateUtil import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair -import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter} -import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory, Row} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory} +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -119,7 +121,7 @@ class DataSetAggregate( .map(mapFunction) .name(prepareOpName) - val rowTypeInfo = new RowTypeInfo(fieldTypes) + val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) val result = { if (groupingKeys.length > 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala index c7d5131..7133773 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -32,8 +32,10 @@ import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._ import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _} import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter} -import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment} +import org.apache.flink.types.Row import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time @@ -117,7 +119,7 @@ class DataStreamAggregate( .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) .toArray - val rowTypeInfo = new RowTypeInfo(fieldTypes) + val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) val aggString = aggregationToString( inputType, http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala index 54cb8d1..3bf3e0c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala @@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate} import org.apache.calcite.rex.RexLiteral import org.apache.flink.api.table._ +import org.apache.flink.types.Row import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} /** http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala index a11e8c1..72be00c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala @@ -18,13 +18,13 @@ package org.apache.flink.api.table.plan.schema -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.api.table.sources.TableSource -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable(val tableSource: TableSource[_]) extends FlinkTable[Row]( - typeInfo = new RowTypeInfo(tableSource.getFieldTypes), + typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*), fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, fieldNames = tableSource.getFieldsNames) http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala index 1e91711..273aa60 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row /** * The interface for all Flink aggregate functions, which expressed in terms of initiate(), http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala index 7ace2c5..4c473d4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala index 4b045be..db5f477 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala index 7559cec..0699bfa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util.Preconditions @@ -47,11 +47,11 @@ class AggregateMapFunction[IN, OUT]( val input = value.asInstanceOf[Row] for (i <- 0 until aggregates.length) { - val fieldValue = input.productElement(aggFields(i)) + val fieldValue = input.getField(aggFields(i)) aggregates(i).prepare(fieldValue, output) } for (i <- 0 until groupingKeys.length) { - output.setField(i, input.productElement(groupingKeys(i))) + output.setField(i, input.getField(groupingKeys(i))) } output.asInstanceOf[OUT] } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala index ebf0ca7..b2cf07e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util.{Collector, Preconditions} @@ -74,7 +74,7 @@ class AggregateReduceCombineFunction( // Set group keys to aggregateBuffer. for (i <- groupKeysMapping.indices) { - aggregateBuffer.setField(i, last.productElement(i)) + aggregateBuffer.setField(i, last.getField(i)) } aggregateBuffer http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala index 8f096cc..6fe712b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util.{Collector, Preconditions} @@ -78,7 +78,7 @@ class AggregateReduceGroupFunction( // Set group keys value to final output. groupKeysMapping.foreach { case (after, previous) => - output.setField(after, last.productElement(previous)) + output.setField(after, last.getField(previous)) } // Evaluate final aggregate value and set to output. http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala index 9b7ea0b..ff8f6fb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala @@ -22,7 +22,7 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala index 4428963..a181068 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala @@ -31,9 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart} import org.apache.flink.api.table.plan.logical._ -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.table.typeutils.TypeCheckUtils._ -import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException} +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} +import org.apache.flink.types.Row import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} import scala.collection.JavaConversions._ @@ -529,7 +530,7 @@ object AggregateUtil { // concat group key types and aggregation types val allFieldTypes = groupingTypes ++: aggTypes - val partialType = new RowTypeInfo(allFieldTypes) + val partialType = new RowTypeInfo(allFieldTypes: _*) partialType } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala index 6fd890d..4e77549 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala @@ -22,7 +22,7 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala index ce5bc81..998ae62 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import com.google.common.math.LongMath import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import java.math.BigDecimal import java.math.BigInteger @@ -52,10 +52,10 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Long] - val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long] + val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum)) buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } @@ -81,8 +81,8 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { } override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -100,8 +100,8 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] { } override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -119,8 +119,8 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] { } override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -156,17 +156,17 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigInteger] - val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger] + val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] buffer.setField(partialSumIndex, partialSum.add(bufferSum)) buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -192,10 +192,10 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Double] - val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double] + val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] buffer.setField(partialSumIndex, partialSum + bufferSum) buffer.setField(partialCountIndex, partialCount + bufferCount) @@ -224,8 +224,8 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] { override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -243,8 +243,8 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] { } override def doEvaluate(buffer: Row): Any = { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount == 0L) { null } else { @@ -275,18 +275,18 @@ class DecimalAvgAggregate extends AvgAggregate[BigDecimal] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigDecimal] - val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long] - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal] - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal] + val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] buffer.setField(partialSumIndex, partialSum.add(bufferSum)) buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } override def evaluate(buffer: Row): BigDecimal = { - val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long] if (bufferCount != 0) { - val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal] + val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal] bufferSum.divide(BigDecimal.valueOf(bufferCount)) } else { null.asInstanceOf[BigDecimal] http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala index d9f288a..4d6d20b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row class CountAggregate extends Aggregate[Long] { private var countIndex: Int = _ @@ -28,13 +28,13 @@ class CountAggregate extends Aggregate[Long] { } override def merge(intermediate: Row, buffer: Row): Unit = { - val partialCount = intermediate.productElement(countIndex).asInstanceOf[Long] - val bufferCount = buffer.productElement(countIndex).asInstanceOf[Long] + val partialCount = intermediate.getField(countIndex).asInstanceOf[Long] + val bufferCount = buffer.getField(countIndex).asInstanceOf[Long] buffer.setField(countIndex, partialCount + bufferCount) } override def evaluate(buffer: Row): Long = { - buffer.productElement(countIndex).asInstanceOf[Long] + buffer.getField(countIndex).asInstanceOf[Long] } override def prepare(value: Any, intermediate: Row): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala index 85ad8e5..48e2313 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala index d3f871a..1a85dca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window @@ -66,7 +66,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window]( // Set group keys value to final output. groupKeysMapping.foreach { case (after, previous) => - output.setField(after, record.productElement(previous)) + output.setField(after, record.getField(previous)) } // Evaluate final aggregate value and set to output. aggregateMapping.foreach { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala index e2830da..5c36821 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.util.Preconditions /** @@ -54,7 +54,7 @@ class IncrementalAggregateReduceFunction( // copy all fields of value1 into accumulatorRow (0 until intermediateRowArity) - .foreach(i => accumulatorRow.setField(i, value1.productElement(i))) + .foreach(i => accumulatorRow.setField(i, value1.getField(i))) // merge value2 to accumulatorRow aggregates.foreach(_.merge(value2, accumulatorRow)) http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index c880f87..2513383 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala index 81e6890..d0d71ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window @@ -68,7 +68,7 @@ class IncrementalAggregateWindowFunction[W <: Window]( // Set group keys value to final output. groupKeysMapping.foreach { case (after, previous) => - output.setField(after, record.productElement(previous)) + output.setField(after, record.getField(previous)) } // Evaluate final aggregate value and set to output. aggregateMapping.foreach { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala index 9267527..2cb3dc7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.math.BigDecimal import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { @@ -57,9 +57,9 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { * @param buffer */ override def merge(intermediate: Row, buffer: Row): Unit = { - val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T] + val partialValue = intermediate.getField(maxIndex).asInstanceOf[T] if (partialValue != null) { - val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T] + val bufferValue = buffer.getField(maxIndex).asInstanceOf[T] if (bufferValue != null) { val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue buffer.setField(maxIndex, max) @@ -76,7 +76,7 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { * @return */ override def evaluate(buffer: Row): T = { - buffer.productElement(maxIndex).asInstanceOf[T] + buffer.getField(maxIndex).asInstanceOf[T] } override def supportPartial: Boolean = true @@ -147,9 +147,9 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal] + val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal] if (partialValue != null) { - val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal] + val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal] if (bufferValue != null) { val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue buffer.setField(minIndex, min) @@ -160,7 +160,7 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] { } override def evaluate(buffer: Row): BigDecimal = { - buffer.productElement(minIndex).asInstanceOf[BigDecimal] + buffer.getField(minIndex).asInstanceOf[BigDecimal] } override def supportPartial: Boolean = true http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala index 7e2ebf4..c1c79ec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.math.BigDecimal import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { @@ -56,9 +56,9 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { * @param buffer */ override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.productElement(minIndex).asInstanceOf[T] + val partialValue = partial.getField(minIndex).asInstanceOf[T] if (partialValue != null) { - val bufferValue = buffer.productElement(minIndex).asInstanceOf[T] + val bufferValue = buffer.getField(minIndex).asInstanceOf[T] if (bufferValue != null) { val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue buffer.setField(minIndex, min) @@ -75,7 +75,7 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] { * @return */ override def evaluate(buffer: Row): T = { - buffer.productElement(minIndex).asInstanceOf[T] + buffer.getField(minIndex).asInstanceOf[T] } override def supportPartial: Boolean = true @@ -146,9 +146,9 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] { } override def merge(partial: Row, buffer: Row): Unit = { - val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal] + val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal] if (partialValue != null) { - val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal] + val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal] if (bufferValue != null) { val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue buffer.setField(minIndex, min) @@ -159,7 +159,7 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] { } override def evaluate(buffer: Row): BigDecimal = { - buffer.productElement(minIndex).asInstanceOf[BigDecimal] + buffer.getField(minIndex).asInstanceOf[BigDecimal] } override def supportPartial: Boolean = true
