http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
new file mode 100644
index 0000000..a68e81e
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class RowCsvInputFormatTest {
+
+       private static Path PATH = new Path("an/ignored/file/");
+
+       // static variables for testing the removal of \r\n to \n
+       private static String FIRST_PART = "That is the first part";
+       private static String SECOND_PART = "That is the second part";
+
+       @Test
+       public void ignoreInvalidLines() throws Exception {
+               String fileContent =
+                       "#description of the data\n" +
+                               "header1|header2|header3|\n" +
+                               "this is|1|2.0|\n" +
+                               "//a comment\n" +
+                               "a test|3|4.0|\n" +
+                               "#next|5|6.0|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new 
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.setLenient(false);
+               Configuration parameters = new Configuration();
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+               try {
+                       result = format.nextRecord(result);
+                       fail("Parse Exception was not thrown! (Row too short)");
+               } catch (ParseException ignored) {
+               } // => ok
+
+               try {
+                       result = format.nextRecord(result);
+                       fail("Parse Exception was not thrown! (Invalid int 
value)");
+               } catch (ParseException ignored) {
+               } // => ok
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("this is", result.getField(0));
+               assertEquals(1, result.getField(1));
+               assertEquals(2.0, result.getField(2));
+
+               try {
+                       result = format.nextRecord(result);
+                       fail("Parse Exception was not thrown! (Row too short)");
+               } catch (ParseException ignored) {
+               } // => ok
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("a test", result.getField(0));
+               assertEquals(3, result.getField(1));
+               assertEquals(4.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("#next", result.getField(0));
+               assertEquals(5, result.getField(1));
+               assertEquals(6.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+
+               // re-open with lenient = true
+               format.setLenient(true);
+               format.configure(parameters);
+               format.open(split);
+
+               result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("header1", result.getField(0));
+               assertNull(result.getField(1));
+               assertNull(result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("this is", result.getField(0));
+               assertEquals(1, result.getField(1));
+               assertEquals(2.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("a test", result.getField(0));
+               assertEquals(3, result.getField(1));
+               assertEquals(4.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("#next", result.getField(0));
+               assertEquals(5, result.getField(1));
+               assertEquals(6.0, result.getField(2));
+               result = format.nextRecord(result);
+               assertNull(result);
+       }
+
+       @Test
+       public void ignoreSingleCharPrefixComments() throws Exception {
+               String fileContent =
+                       "#description of the data\n" +
+                               "#successive commented line\n" +
+                               "this is|1|2.0|\n" +
+                               "a test|3|4.0|\n" +
+                               "#next|5|6.0|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.setCommentPrefix("#");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("this is", result.getField(0));
+               assertEquals(1, result.getField(1));
+               assertEquals(2.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("a test", result.getField(0));
+               assertEquals(3, result.getField(1));
+               assertEquals(4.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+       }
+
+       @Test
+       public void ignoreMultiCharPrefixComments() throws Exception {
+               String fileContent =
+                       "//description of the data\n" +
+                               "//successive commented line\n" +
+                               "this is|1|2.0|\n" +
+                               "a test|3|4.0|\n" +
+                               "//next|5|6.0|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.setCommentPrefix("//");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("this is", result.getField(0));
+               assertEquals(1, result.getField(1));
+               assertEquals(2.0, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("a test", result.getField(0));
+               assertEquals(3, result.getField(1));
+               assertEquals(4.0, result.getField(2));
+               result = format.nextRecord(result);
+               assertNull(result);
+       }
+
+       @Test
+       public void readStringFields() throws Exception {
+               String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("def", result.getField(1));
+               assertEquals("ghijk", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("hhg", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void readMixedQuotedStringFields() throws Exception {
+               String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.configure(new Configuration());
+               format.enableQuotedStringParsing('@');
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("a|b|c", result.getField(0));
+               assertEquals("def", result.getField(1));
+               assertEquals("ghijk", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("|hhg", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void readStringFieldsWithTrailingDelimiters() throws Exception {
+               String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+               format.setFieldDelimiter("|-");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("def", result.getField(1));
+               assertEquals("ghijk", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("hhg", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testIntegerFields() throws Exception {
+               String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, "\n", "|");
+
+               format.setFieldDelimiter("|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(5);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(111, result.getField(0));
+               assertEquals(222, result.getField(1));
+               assertEquals(333, result.getField(2));
+               assertEquals(444, result.getField(3));
+               assertEquals(555, result.getField(4));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(666, result.getField(0));
+               assertEquals(777, result.getField(1));
+               assertEquals(888, result.getField(2));
+               assertEquals(999, result.getField(3));
+               assertEquals(0, result.getField(4));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testEmptyFields() throws Exception {
+               String fileContent =
+                       ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n" +
+                               ",,,,,,,,\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                       BasicTypeInfo.BYTE_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.FLOAT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.LONG_TYPE_INFO,
+                       BasicTypeInfo.SHORT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo, true);
+               format.setFieldDelimiter(",");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(8);
+               int linesCnt = fileContent.split("\n").length;
+
+               for (int i = 0; i < linesCnt; i++) {
+                       result = format.nextRecord(result);
+                       assertNull(result.getField(i));
+               }
+
+               // ensure no more rows
+               assertNull(format.nextRecord(result));
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testDoubleFields() throws Exception {
+               String fileContent = 
"11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo);
+               format.setFieldDelimiter("|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(5);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(11.1, result.getField(0));
+               assertEquals(22.2, result.getField(1));
+               assertEquals(33.3, result.getField(2));
+               assertEquals(44.4, result.getField(3));
+               assertEquals(55.5, result.getField(4));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(66.6, result.getField(0));
+               assertEquals(77.7, result.getField(1));
+               assertEquals(88.8, result.getField(2));
+               assertEquals(99.9, result.getField(3));
+               assertEquals(0.0, result.getField(4));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testReadFirstN() throws Exception {
+               String fileContent = 
"111|222|333|444|555|\n666|777|888|999|000|\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo);
+               format.setFieldDelimiter("|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(2);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(111, result.getField(0));
+               assertEquals(222, result.getField(1));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(666, result.getField(0));
+               assertEquals(777, result.getField(1));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testReadSparseWithNullFieldsForTypes() throws Exception {
+               String fileContent = 
"111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
+                       
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(
+                       PATH,
+                       typeInfo,
+                       new boolean[]{true, false, false, true, false, false, 
false, true});
+               format.setFieldDelimiter("|x|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(111, result.getField(0));
+               assertEquals(444, result.getField(1));
+               assertEquals(888, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(0, result.getField(0));
+               assertEquals(777, result.getField(1));
+               assertEquals(333, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testReadSparseWithPositionSetter() throws Exception {
+               String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n" +
+                       "000|999|888|777|666|555|444|333|222|111|";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(
+                       PATH,
+                       typeInfo,
+                       new int[]{0, 3, 7});
+               format.setFieldDelimiter("|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+               result = format.nextRecord(result);
+
+               assertNotNull(result);
+               assertEquals(111, result.getField(0));
+               assertEquals(444, result.getField(1));
+               assertEquals(888, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(0, result.getField(0));
+               assertEquals(777, result.getField(1));
+               assertEquals(333, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testReadSparseWithMask() throws Exception {
+               String fileContent = 
"111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
+                       "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
+
+               FileInputSplit split = 
RowCsvInputFormatTest.createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(
+                       PATH,
+                       typeInfo,
+                       new boolean[]{true, false, false, true, false, false, 
false, true});
+               format.setFieldDelimiter("&&");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(111, result.getField(0));
+               assertEquals(444, result.getField(1));
+               assertEquals(888, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(0, result.getField(0));
+               assertEquals(777, result.getField(1));
+               assertEquals(333, result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testParseStringErrors() throws Exception {
+               StringParser stringParser = new StringParser();
+
+               stringParser.enableQuotedStringParsing((byte) '"');
+
+               Map<String, StringParser.ParseErrorState> failures = new 
HashMap<>();
+               failures.put("\"string\" trailing", 
FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+               failures.put("\"unterminated ", 
FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING);
+
+               for (Map.Entry<String, StringParser.ParseErrorState> failure : 
failures.entrySet()) {
+                       int result = stringParser.parseField(
+                               failure.getKey().getBytes(),
+                               0,
+                               failure.getKey().length(),
+                               new byte[]{(byte) '|'},
+                               null);
+                       assertEquals(-1, result);
+                       assertEquals(failure.getValue(), 
stringParser.getErrorState());
+               }
+       }
+
+       // Test disabled because we do not support double-quote escaped quotes 
right now.
+       @Test
+       @Ignore
+       public void testParserCorrectness() throws Exception {
+               // RFC 4180 Compliance Test content
+               // Taken from 
http://en.wikipedia.org/wiki/Comma-separated_values#Example
+               String fileContent = "Year,Make,Model,Description,Price\n" +
+                       "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+                       "1999,Chevy,\"Venture \"\"Extended 
Edition\"\"\",\"\",4900.00\n" +
+                       "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, 
loaded\",4799.00\n" +
+                       "1999,Chevy,\"Venture \"\"Extended Edition, Very 
Large\"\"\",,5000.00\n" +
+                       ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo);
+               format.setSkipFirstLineAsHeader(true);
+               format.setFieldDelimiter(",");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(5);
+               Row r1 = new Row(5);
+               r1.setField(0, 1997);
+               r1.setField(1, "Ford");
+               r1.setField(2, "E350");
+               r1.setField(3, "ac, abs, moon");
+               r1.setField(4, 3000.0);
+
+               Row r2 = new Row(5);
+               r2.setField(0, 1999);
+               r2.setField(1, "Chevy");
+               r2.setField(2, "Venture \"Extended Edition\"");
+               r2.setField(3, "");
+               r2.setField(4, 4900.0);
+
+               Row r3 = new Row(5);
+               r3.setField(0, 1996);
+               r3.setField(1, "Jeep");
+               r3.setField(2, "Grand Cherokee");
+               r3.setField(3, "MUST SELL! air, moon roof, loaded");
+               r3.setField(4, 4799.0);
+
+               Row r4 = new Row(5);
+               r4.setField(0, 1999);
+               r4.setField(1, "Chevy");
+               r4.setField(2, "Venture \"Extended Edition, Very Large\"");
+               r4.setField(3, "");
+               r4.setField(4, 5000.0);
+
+               Row r5 = new Row(5);
+               r5.setField(0, 0);
+               r5.setField(1, "");
+               r5.setField(2, "Venture \"Extended Edition\"");
+               r5.setField(3, "");
+               r5.setField(4, 4900.0);
+
+               Row[] expectedLines = new Row[]{r1, r2, r3, r4, r5};
+               for (Row expected : expectedLines) {
+                       result = format.nextRecord(result);
+                       assertEquals(expected, result);
+               }
+               assertNull(format.nextRecord(result));
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testWindowsLineEndRemoval() throws Exception {
+
+               // check typical use case -- linux file is correct and it is 
set up to linux(\n)
+               testRemovingTrailingCR("\n", "\n");
+
+               // check typical windows case -- windows file endings and file 
has windows file endings set up
+               testRemovingTrailingCR("\r\n", "\r\n");
+
+               // check problematic case windows file -- windows file 
endings(\r\n)
+               // but linux line endings (\n) set up
+               testRemovingTrailingCR("\r\n", "\n");
+
+               // check problematic case linux file -- linux file endings (\n)
+               // but windows file endings set up (\r\n)
+               // specific setup for windows line endings will expect \r\n 
because
+               // it has to be set up and is not standard.
+       }
+
+       @Test
+       public void testQuotedStringParsingWithIncludeFields() throws Exception 
{
+               String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager 
memory error in Eclipse\"|" +
+                       "\"Blahblah <[email protected]>\"|\"blaaa|\"blubb\"";
+               File tempFile = File.createTempFile("CsvReaderQuotedString", 
"tmp");
+               tempFile.deleteOnExit();
+               tempFile.setWritable(true);
+
+               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+               writer.write(fileContent);
+               writer.close();
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+                       new Path(tempFile.toURI().toString()),
+                       typeInfo,
+                       new boolean[]{true, false, true});
+               inputFormat.enableQuotedStringParsing('"');
+               inputFormat.setFieldDelimiter("|");
+               inputFormat.setDelimiter('\n');
+               inputFormat.configure(new Configuration());
+
+               FileInputSplit[] splits = inputFormat.createInputSplits(1);
+               inputFormat.open(splits[0]);
+
+               Row record = inputFormat.nextRecord(new Row(2));
+               assertEquals("20:41:52-1-3-2015", record.getField(0));
+               assertEquals("Blahblah <[email protected]>", 
record.getField(1));
+       }
+
+       @Test
+       public void testQuotedStringParsingWithEscapedQuotes() throws Exception 
{
+               String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" 
young\"";
+               File tempFile = File.createTempFile("CsvReaderQuotedString", 
"tmp");
+               tempFile.deleteOnExit();
+               tempFile.setWritable(true);
+
+               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+               writer.write(fileContent);
+               writer.close();
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat inputFormat = new RowCsvInputFormat(new 
Path(tempFile.toURI().toString()), typeInfo);
+               inputFormat.enableQuotedStringParsing('"');
+               inputFormat.setFieldDelimiter("|");
+               inputFormat.setDelimiter('\n');
+               inputFormat.configure(new Configuration());
+
+               FileInputSplit[] splits = inputFormat.createInputSplits(1);
+               inputFormat.open(splits[0]);
+
+               Row record = inputFormat.nextRecord(new Row(2));
+               assertEquals("\\\"Hello\\\" World", record.getField(0));
+               assertEquals("We are\\\" young", record.getField(1));
+       }
+
+       @Test
+       public void testSqlTimeFields() throws Exception {
+               String fileContent = "1990-10-14|02:42:25|1990-10-14 
02:42:25.123|1990-1-4 2:2:5\n" +
+                       "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 
2:2:5.3\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       SqlTimeTypeInfo.DATE,
+                       SqlTimeTypeInfo.TIME,
+                       SqlTimeTypeInfo.TIMESTAMP,
+                       SqlTimeTypeInfo.TIMESTAMP);
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
typeInfo);
+               format.setFieldDelimiter("|");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(4);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+               assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+               assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), 
result.getField(2));
+               assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), 
result.getField(3));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+               assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+               assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), 
result.getField(2));
+               assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), 
result.getField(3));
+
+               result = format.nextRecord(result);
+               assertNull(result);
+               assertTrue(format.reachedEnd());
+       }
+
+       private static FileInputSplit createTempFile(String content) throws 
IOException {
+               File tempFile = File.createTempFile("test_contents", "tmp");
+               tempFile.deleteOnExit();
+               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile), StandardCharsets.UTF_8);
+               wrt.write(content);
+               wrt.close();
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new 
String[]{"localhost"});
+       }
+
+       private static void testRemovingTrailingCR(String lineBreakerInFile, 
String lineBreakerSetup) throws IOException {
+               String fileContent = FIRST_PART + lineBreakerInFile + 
SECOND_PART + lineBreakerInFile;
+
+               // create input file
+               File tempFile = File.createTempFile("CsvInputFormatTest", 
"tmp");
+               tempFile.deleteOnExit();
+               tempFile.setWritable(true);
+
+               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+               wrt.write(fileContent);
+               wrt.close();
+
+               RowTypeInfo typeInfo = new 
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowCsvInputFormat inputFormat = new RowCsvInputFormat(new 
Path(tempFile.toURI().toString()), typeInfo);
+               inputFormat.configure(new Configuration());
+               inputFormat.setDelimiter(lineBreakerSetup);
+
+               FileInputSplit[] splits = inputFormat.createInputSplits(1);
+               inputFormat.open(splits[0]);
+
+               Row result = inputFormat.nextRecord(new Row(1));
+               assertNotNull("Expecting to not return null", result);
+               assertEquals(FIRST_PART, result.getField(0));
+
+               result = inputFormat.nextRecord(result);
+               assertNotNull("Expecting to not return null", result);
+               assertEquals(SECOND_PART, result.getField(0));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 3517338..0f748c5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -133,7 +133,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
     *
@@ -150,7 +150,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 83293e3..3218ced 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -135,7 +135,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *
@@ -152,7 +152,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index f4bfe31..26fe51e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -128,7 +128,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index dde69d5..044ace8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -131,7 +131,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
-    * - [[org.apache.flink.api.table.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 3bce5cf..1e8bf39 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -18,7 +18,8 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{Row, Table}
+import org.apache.flink.api.table.Table
+import org.apache.flink.types.Row
 import scala.language.implicitConversions
 import org.apache.flink.streaming.api.scala._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 918b01b..6d00ab3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
 import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.api.table.sources.BatchTableSource
+import org.apache.flink.types.Row
 
 /**
   * The abstract base class for batch TableEnvironments.
@@ -168,7 +169,7 @@ abstract class BatchTableEnvironment(
   private[flink] def explain(table: Table, extended: Boolean): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataSet = 
translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataSet = translate[Row](optimizedPlan) 
(TypeExtractor.createTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 8f00586..da20e07 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.sinks.{StreamTableSink, 
TableSink}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.types.Row
 
 /**
   * The base class for stream TableEnvironments.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index b6d0e31..07ea860 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -32,15 +32,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, 
RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.table.{BatchTableEnvironment => 
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.table.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
 import org.apache.flink.api.table.codegen.ExpressionReducer
 import org.apache.flink.api.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
-import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createTableSqlFunctions, createScalarSqlFunction}
-import org.apache.flink.api.table.functions.{TableFunction, ScalarFunction}
+import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
 import org.apache.flink.api.table.plan.schema.RelTable
 import org.apache.flink.api.table.sinks.TableSink
@@ -347,6 +347,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       case t: TupleTypeInfo[A] => t.getFieldNames
       case c: CaseClassTypeInfo[A] => c.getFieldNames
       case p: PojoTypeInfo[A] => p.getFieldNames
+      case r: RowTypeInfo => r.getFieldNames
       case tpe =>
         throw new TableException(s"Type $tpe lacks explicit field naming")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 4092a24..a706309 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,8 @@ import 
org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, 
TypeExtractor}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, 
TypeCheckUtils}
 
 object CodeGenUtils {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 7caad12..cdb3753 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -37,7 +37,8 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.FunctionGenerator
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
@@ -1139,7 +1140,7 @@ class CodeGenerator(
 
           case ProductAccessor(i) =>
             // Object
-            val inputCode = s"($fieldTypeTerm) $inputTerm.productElement($i)"
+            val inputCode = s"($fieldTypeTerm) $inputTerm.getField($i)"
             generateInputFieldUnboxing(fieldType, inputCode)
 
           case ObjectPrivateFieldAccessor(field) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
index 731452f..871264e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -25,8 +25,10 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
 import scala.collection.JavaConverters._
 
@@ -69,7 +71,7 @@ class ExpressionReducer(config: TableConfig)
     }
 
     val literalTypes = literals.map(e => 
FlinkTypeFactory.toTypeInfo(e.getType))
-    val resultType = new RowTypeInfo(literalTypes)
+    val resultType = new RowTypeInfo(literalTypes: _*)
 
     // generate MapFunction
     val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
@@ -105,7 +107,7 @@ class ExpressionReducer(config: TableConfig)
           reducedValues.add(unreduced)
         case _ =>
           val literal = rexBuilder.makeLiteral(
-            reduced.productElement(reducedIdx),
+            reduced.getField(reducedIdx),
             unreduced.getType,
             true)
           reducedValues.add(literal)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index e85ade0..94513d9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -28,8 +28,10 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.plan.nodes.FlinkAggregate
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory, 
Row}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -119,7 +121,7 @@ class DataSetAggregate(
       .map(mapFunction)
       .name(prepareOpName)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
 
     val result = {
       if (groupingKeys.length > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
index c7d5131..7133773 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -32,8 +32,10 @@ import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, 
TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -117,7 +119,7 @@ class DataStreamAggregate(
       .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
       .toArray
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
 
     val aggString = aggregationToString(
       inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
index 54cb8d1..3bf3e0c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, 
LogicalAggregate}
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.table._
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, 
DataSetConvention}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
index a11e8c1..72be00c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.sources.TableSource
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
-    typeInfo = new RowTypeInfo(tableSource.getFieldTypes),
+    typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
     fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
     fieldNames = tableSource.getFieldsNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
index 1e91711..273aa60 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 /**
  * The interface for all Flink aggregate functions, which expressed in terms 
of initiate(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
index 7ace2c5..4c473d4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
index 4b045be..db5f477 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
index 7559cec..0699bfa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Preconditions
 
@@ -47,11 +47,11 @@ class AggregateMapFunction[IN, OUT](
     
     val input = value.asInstanceOf[Row]
     for (i <- 0 until aggregates.length) {
-      val fieldValue = input.productElement(aggFields(i))
+      val fieldValue = input.getField(aggFields(i))
       aggregates(i).prepare(fieldValue, output)
     }
     for (i <- 0 until groupingKeys.length) {
-      output.setField(i, input.productElement(groupingKeys(i)))
+      output.setField(i, input.getField(groupingKeys(i)))
     }
     output.asInstanceOf[OUT]
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index ebf0ca7..b2cf07e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.{Collector, Preconditions}
 
@@ -74,7 +74,7 @@ class AggregateReduceCombineFunction(
 
     // Set group keys to aggregateBuffer.
     for (i <- groupKeysMapping.indices) {
-      aggregateBuffer.setField(i, last.productElement(i))
+      aggregateBuffer.setField(i, last.getField(i))
     }
 
     aggregateBuffer

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 8f096cc..6fe712b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.{Collector, Preconditions}
 
@@ -78,7 +78,7 @@ class AggregateReduceGroupFunction(
     // Set group keys value to final output.
     groupKeysMapping.foreach {
       case (after, previous) =>
-        output.setField(after, last.productElement(previous))
+        output.setField(after, last.getField(previous))
     }
 
     // Evaluate final aggregate value and set to output.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
index 9b7ea0b..ff8f6fb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 4428963..a181068 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -31,9 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
 import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
+import org.apache.flink.api.table.{FlinkTypeFactory,  TableException}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
 import scala.collection.JavaConversions._
@@ -529,7 +530,7 @@ object AggregateUtil {
 
     // concat group key types and aggregation types
     val allFieldTypes = groupingTypes ++: aggTypes
-    val partialType = new RowTypeInfo(allFieldTypes)
+    val partialType = new RowTypeInfo(allFieldTypes: _*)
     partialType
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
index 6fd890d..4e77549 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index ce5bc81..998ae62 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import com.google.common.math.LongMath
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
@@ -52,10 +52,10 @@ abstract class IntegralAvgAggregate[T] extends 
AvgAggregate[T] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Long]
-    val partialCount = 
partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, 
bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, 
bufferCount))
   }
@@ -81,8 +81,8 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -100,8 +100,8 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] 
{
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -119,8 +119,8 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -156,17 +156,17 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] 
{
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = 
partial.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val partialCount = 
partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = 
buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, partialSum.add(bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, 
bufferCount))
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = 
buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -192,10 +192,10 @@ abstract class FloatingAvgAggregate[T: Numeric] extends 
AvgAggregate[T] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = 
partial.productElement(partialSumIndex).asInstanceOf[Double]
-    val partialCount = 
partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
 
     buffer.setField(partialSumIndex, partialSum + bufferSum)
     buffer.setField(partialCountIndex, partialCount + bufferCount)
@@ -224,8 +224,8 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] 
{
 
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -243,8 +243,8 @@ class DoubleAvgAggregate extends 
FloatingAvgAggregate[Double] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -275,18 +275,18 @@ class DecimalAvgAggregate extends 
AvgAggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = 
partial.productElement(partialSumIndex).asInstanceOf[BigDecimal]
-    val partialCount = 
partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = 
buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, partialSum.add(bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, 
bufferCount))
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount != 0) {
-      val bufferSum = 
buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
+      val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
       bufferSum.divide(BigDecimal.valueOf(bufferCount))
     } else {
       null.asInstanceOf[BigDecimal]

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
index d9f288a..4d6d20b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 class CountAggregate extends Aggregate[Long] {
   private var countIndex: Int = _
@@ -28,13 +28,13 @@ class CountAggregate extends Aggregate[Long] {
   }
 
   override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialCount = 
intermediate.productElement(countIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(countIndex).asInstanceOf[Long]
+    val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
     buffer.setField(countIndex, partialCount + bufferCount)
   }
 
   override def evaluate(buffer: Row): Long = {
-    buffer.productElement(countIndex).asInstanceOf[Long]
+    buffer.getField(countIndex).asInstanceOf[Long]
   }
 
   override def prepare(value: Any, intermediate: Row): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index 85ad8e5..48e2313 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index d3f871a..1a85dca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -66,7 +66,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
       // Set group keys value to final output.
       groupKeysMapping.foreach {
         case (after, previous) =>
-          output.setField(after, record.productElement(previous))
+          output.setField(after, record.getField(previous))
       }
       // Evaluate final aggregate value and set to output.
       aggregateMapping.foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
index e2830da..5c36821 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions
 
 /**
@@ -54,7 +54,7 @@ class IncrementalAggregateReduceFunction(
 
     // copy all fields of value1 into accumulatorRow
     (0 until intermediateRowArity)
-    .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
+    .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
     // merge value2 to accumulatorRow
     aggregates.foreach(_.merge(value2, accumulatorRow))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index c880f87..2513383 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 81e6890..d0d71ee 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -68,7 +68,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
       // Set group keys value to final output.
       groupKeysMapping.foreach {
         case (after, previous) =>
-          output.setField(after, record.productElement(previous))
+          output.setField(after, record.getField(previous))
       }
       // Evaluate final aggregate value and set to output.
       aggregateMapping.foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 9267527..2cb3dc7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.math.BigDecimal
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] 
{
 
@@ -57,9 +57,9 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) 
extends Aggregate[T] {
    * @param buffer
    */
   override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
+    val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
+      val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
       if (bufferValue != null) {
         val max : T = if (ord.compare(partialValue, bufferValue) > 0) 
partialValue else bufferValue
         buffer.setField(maxIndex, max)
@@ -76,7 +76,7 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) 
extends Aggregate[T] {
    * @return
    */
   override def evaluate(buffer: Row): T = {
-    buffer.productElement(maxIndex).asInstanceOf[T]
+    buffer.getField(maxIndex).asInstanceOf[T]
   }
 
   override def supportPartial: Boolean = true
@@ -147,9 +147,9 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = 
partial.productElement(minIndex).asInstanceOf[BigDecimal]
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
     if (partialValue != null) {
-      val bufferValue = 
buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
       if (bufferValue != null) {
         val min = if (partialValue.compareTo(bufferValue) > 0) partialValue 
else bufferValue
         buffer.setField(minIndex, min)
@@ -160,7 +160,7 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
   }
 
   override def supportPartial: Boolean = true

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index 7e2ebf4..c1c79ec 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.math.BigDecimal
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] 
{
 
@@ -56,9 +56,9 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) 
extends Aggregate[T] {
    * @param buffer
    */
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.productElement(minIndex).asInstanceOf[T]
+    val partialValue = partial.getField(minIndex).asInstanceOf[T]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
       if (bufferValue != null) {
         val min : T = if (ord.compare(partialValue, bufferValue) < 0) 
partialValue else bufferValue
         buffer.setField(minIndex, min)
@@ -75,7 +75,7 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) 
extends Aggregate[T] {
    * @return
    */
   override def evaluate(buffer: Row): T = {
-    buffer.productElement(minIndex).asInstanceOf[T]
+    buffer.getField(minIndex).asInstanceOf[T]
   }
 
   override def supportPartial: Boolean = true
@@ -146,9 +146,9 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = 
partial.productElement(minIndex).asInstanceOf[BigDecimal]
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
     if (partialValue != null) {
-      val bufferValue = 
buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
       if (bufferValue != null) {
         val min = if (partialValue.compareTo(bufferValue) < 0) partialValue 
else bufferValue
         buffer.setField(minIndex, min)
@@ -159,7 +159,7 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
   }
 
   override def supportPartial: Boolean = true

Reply via email to