[FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new 
Row type.

- Port RowCsvInputFormat to Java and move it to flink-core.

This closes #3003.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d27f8f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d27f8f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d27f8f2

Branch: refs/heads/master
Commit: 4d27f8f2deef9fad845ebc91cef121cf9b35f825
Parents: a9e6ec8
Author: tonycox <[email protected]>
Authored: Fri Dec 9 21:41:36 2016 +0400
Committer: Fabian Hueske <[email protected]>
Committed: Thu Dec 15 11:36:40 2016 +0100

----------------------------------------------------------------------
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../connectors/kafka/Kafka08JsonTableSink.java  |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka08JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/Kafka09JsonTableSink.java  |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   2 +-
 .../kafka/Kafka09JsonTableSourceTest.java       |   2 +-
 .../connectors/kafka/KafkaJsonTableSink.java    |   2 +-
 .../connectors/kafka/KafkaTableSink.java        |   4 +-
 .../connectors/kafka/KafkaTableSource.java      |   4 +-
 .../JsonRowDeserializationSchema.java           |   4 +-
 .../JsonRowSerializationSchema.java             |   8 +-
 .../kafka/JsonRowDeserializationSchemaTest.java |  14 +-
 .../kafka/JsonRowSerializationSchemaTest.java   |   8 +-
 .../kafka/KafkaTableSinkTestBase.java           |   4 +-
 .../kafka/KafkaTableSourceTestBase.java         |   2 +-
 .../flink/api/java/io/jdbc/JDBCInputFormat.java |   6 +-
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  44 +-
 .../flink/api/java/io/jdbc/JDBCFullTest.java    |   2 +-
 .../api/java/io/jdbc/JDBCInputFormatTest.java   |  38 +-
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  24 +-
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |   2 +-
 .../flink/api/java/io/RowCsvInputFormat.java    | 174 ++++
 .../api/java/io/RowCsvInputFormatTest.java      | 879 +++++++++++++++++++
 .../api/java/table/BatchTableEnvironment.scala  |   4 +-
 .../api/java/table/StreamTableEnvironment.scala |   4 +-
 .../api/scala/table/BatchTableEnvironment.scala |   2 +-
 .../scala/table/StreamTableEnvironment.scala    |   2 +-
 .../apache/flink/api/scala/table/package.scala  |   3 +-
 .../flink/api/table/BatchTableEnvironment.scala |   3 +-
 .../api/table/StreamTableEnvironment.scala      |   1 +
 .../flink/api/table/TableEnvironment.scala      |   7 +-
 .../flink/api/table/codegen/CodeGenUtils.scala  |   3 +-
 .../flink/api/table/codegen/CodeGenerator.scala |   5 +-
 .../api/table/codegen/ExpressionReducer.scala   |  10 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   8 +-
 .../nodes/datastream/DataStreamAggregate.scala  |   8 +-
 .../DataSetAggregateWithNullValuesRule.scala    |   1 +
 .../table/plan/schema/TableSourceTable.scala    |   6 +-
 .../api/table/runtime/aggregate/Aggregate.scala |   2 +-
 .../AggregateAllTimeWindowFunction.scala        |   2 +-
 .../aggregate/AggregateAllWindowFunction.scala  |   2 +-
 .../aggregate/AggregateMapFunction.scala        |   6 +-
 .../AggregateReduceCombineFunction.scala        |   4 +-
 .../AggregateReduceGroupFunction.scala          |   4 +-
 .../aggregate/AggregateTimeWindowFunction.scala |   2 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   7 +-
 .../aggregate/AggregateWindowFunction.scala     |   2 +-
 .../table/runtime/aggregate/AvgAggregate.scala  |  62 +-
 .../runtime/aggregate/CountAggregate.scala      |   8 +-
 ...rementalAggregateAllTimeWindowFunction.scala |   2 +-
 .../IncrementalAggregateAllWindowFunction.scala |   4 +-
 .../IncrementalAggregateReduceFunction.scala    |   4 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   2 +-
 .../IncrementalAggregateWindowFunction.scala    |   4 +-
 .../table/runtime/aggregate/MaxAggregate.scala  |  14 +-
 .../table/runtime/aggregate/MinAggregate.scala  |  14 +-
 .../table/runtime/aggregate/SumAggregate.scala  |  14 +-
 .../aggregate/TimeWindowPropertyCollector.scala |   4 +-
 .../flink/api/table/sinks/CsvTableSink.scala    |  12 +-
 .../api/table/sources/CsvTableSource.scala      |   9 +-
 .../api/table/typeutils/TypeConverter.scala     |  11 +-
 .../api/java/batch/TableEnvironmentITCase.java  |   2 +-
 .../flink/api/java/batch/TableSourceITCase.java |   4 +-
 .../flink/api/java/batch/sql/SqlITCase.java     |   2 +-
 .../java/batch/table/AggregationsITCase.java    |   2 +-
 .../flink/api/java/batch/table/CalcITCase.java  |   2 +-
 .../api/java/batch/table/CastingITCase.java     |   2 +-
 .../flink/api/java/batch/table/JoinITCase.java  |   2 +-
 .../flink/api/java/stream/sql/SqlITCase.java    |   2 +-
 .../batch/ProjectableTableSourceITCase.scala    |   7 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   3 +-
 .../api/scala/batch/TableSourceITCase.scala     |   9 +-
 .../scala/batch/sql/AggregationsITCase.scala    |   3 +-
 .../flink/api/scala/batch/sql/CalcITCase.scala  |   3 +-
 .../flink/api/scala/batch/sql/JoinITCase.scala  |   3 +-
 .../scala/batch/sql/SetOperatorsITCase.scala    |   3 +-
 .../flink/api/scala/batch/sql/SortITCase.scala  |  32 +-
 .../scala/batch/sql/TableWithSQLITCase.scala    |   3 +-
 .../scala/batch/table/AggregationsITCase.scala  |   3 +-
 .../api/scala/batch/table/CalcITCase.scala      |   3 +-
 .../api/scala/batch/table/JoinITCase.scala      |   3 +-
 .../scala/batch/table/SetOperatorsITCase.scala  |   3 +-
 .../api/scala/batch/table/SortITCase.scala      |  49 +-
 .../table/UserDefinedTableFunctionTest.scala    |   7 +-
 .../api/scala/stream/TableSourceITCase.scala    |   7 +-
 .../flink/api/scala/stream/sql/SqlITCase.scala  |   3 +-
 .../scala/stream/table/AggregationsITCase.scala |   3 +-
 .../api/scala/stream/table/CalcITCase.scala     |   3 +-
 .../api/scala/stream/table/UnionITCase.scala    |   3 +-
 .../table/UserDefinedTableFunctionTest.scala    |   8 +-
 .../api/scala/stream/utils/StreamITCase.scala   |   2 +-
 .../api/table/expressions/ArrayTypeTest.scala   |   9 +-
 .../table/expressions/CompositeAccessTest.scala |   9 +-
 .../api/table/expressions/DecimalTypeTest.scala |   9 +-
 .../expressions/NonDeterministicTests.scala     |   6 +-
 .../table/expressions/ScalarFunctionsTest.scala |   9 +-
 .../table/expressions/ScalarOperatorsTest.scala |   9 +-
 .../table/expressions/SqlExpressionTest.scala   |   6 +-
 .../table/expressions/TemporalTypesTest.scala   |   9 +-
 .../UserDefinedScalarFunctionTest.scala         |   9 +-
 .../expressions/utils/ExpressionTestBase.scala  |   7 +-
 .../runtime/aggregate/AggregateTestBase.scala   |   2 +-
 .../dataset/DataSetCorrelateITCase.scala        |   3 +-
 .../datastream/DataStreamCorrelateITCase.scala  |   3 +-
 .../table/utils/UserDefinedTableFunctions.scala |   8 +-
 111 files changed, 1457 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index ddf1ad3..920d718 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 732440b..127dafc 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index b155576..839388f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 63bb57e..6c7d727 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8f51237..0e3791c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 6d0b140..0ac452e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index a2d66ac..f9ef2ce 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 38ea47c..edbebd0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 975ef58..dfcba5f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 03b5040..f423003 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 45f70ac..df84a0f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index 4a75f50..10b9acc 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ee98783..27c4de7 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 714d9cd..6c42943 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index fd423d7..498e918 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index 4344810..b4b3341 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 077ff13..1998aa6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.util.serialization;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 
@@ -49,15 +49,15 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
 
        @Override
        public byte[] serialize(Row row) {
-               if (row.productArity() != fieldNames.length) {
+               if (row.getArity() != fieldNames.length) {
                        throw new IllegalStateException(String.format(
                                "Number of elements in the row %s is different 
from number of field names: %d", row, fieldNames.length));
                }
 
                ObjectNode objectNode = mapper.createObjectNode();
 
-               for (int i = 0; i < row.productArity(); i++) {
-                       JsonNode node = 
mapper.valueToTree(row.productElement(i));
+               for (int i = 0; i < row.getArity(); i++) {
+                       JsonNode node = mapper.valueToTree(row.getField(i));
                        objectNode.set(fieldNames[i], node);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index 68225e2..88f62f0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.junit.Test;
 
@@ -61,10 +61,10 @@ public class JsonRowDeserializationSchemaTest {
 
                Row deserialized = 
deserializationSchema.deserialize(serializedJson);
 
-               assertEquals(3, deserialized.productArity());
-               assertEquals(id, deserialized.productElement(0));
-               assertEquals(name, deserialized.productElement(1));
-               assertArrayEquals(bytes, (byte[]) 
deserialized.productElement(2));
+               assertEquals(3, deserialized.getArity());
+               assertEquals(id, deserialized.getField(0));
+               assertEquals(name, deserialized.getField(1));
+               assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
        }
 
        /**
@@ -85,8 +85,8 @@ public class JsonRowDeserializationSchemaTest {
 
                Row row = deserializationSchema.deserialize(serializedJson);
 
-               assertEquals(1, row.productArity());
-               assertNull("Missing field not null", row.productElement(0));
+               assertEquals(1, row.getArity());
+               assertNull("Missing field not null", row.getField(0));
 
                deserializationSchema.setFailOnMissingField(true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 92af15d..78dedf4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.junit.Test;
@@ -88,10 +88,10 @@ public class JsonRowSerializationSchemaTest {
 
        private void assertEqualRows(Row expectedRow, Row resultRow) {
                assertEquals("Deserialized row should have expected number of 
fields",
-                       expectedRow.productArity(), resultRow.productArity());
-               for (int i = 0; i < expectedRow.productArity(); i++) {
+                       expectedRow.getArity(), resultRow.getArity());
+               for (int i = 0; i < expectedRow.getArity(); i++) {
                        assertEquals(String.format("Field number %d should be 
as in the original row", i),
-                               expectedRow.productElement(i), 
resultRow.productElement(i));
+                               expectedRow.getField(i), resultRow.getField(i));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index ae0af52..cc1c166 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 2a281e8..ad51993 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b4246f5..3153f96 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -37,8 +37,8 @@ import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
@@ -276,7 +276,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, 
InputSplit> implements
                        if (!hasNext) {
                                return null;
                        }
-                       for (int pos = 0; pos < row.productArity(); pos++) {
+                       for (int pos = 0; pos < row.getArity(); pos++) {
                                row.setField(pos, resultSet.getObject(pos + 1));
                        }
                        //update hasNext after we've read the record

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index da4b1ad..c5585e2 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
 
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,22 +108,22 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
        @Override
        public void writeRecord(Row row) throws IOException {
 
-               if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.productArity()) {
+               if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.getArity()) {
                        LOG.warn("Column SQL types array doesn't match arity of 
passed Row! Check the passed array...");
                } 
                try {
 
                        if (typesArray == null ) {
                                // no types provided
-                               for (int index = 0; index < row.productArity(); 
index++) {
-                                       LOG.warn("Unknown column type for 
column %s. Best effort approach to set its value: %s.", index + 1, 
row.productElement(index));
-                                       upload.setObject(index + 1, 
row.productElement(index));
+                               for (int index = 0; index < row.getArity(); 
index++) {
+                                       LOG.warn("Unknown column type for 
column %s. Best effort approach to set its value: %s.", index + 1, 
row.getField(index));
+                                       upload.setObject(index + 1, 
row.getField(index));
                                }
                        } else {
                                // types provided
-                               for (int index = 0; index < row.productArity(); 
index++) {
+                               for (int index = 0; index < row.getArity(); 
index++) {
 
-                                       if (row.productElement(index) == null) {
+                                       if (row.getField(index) == null) {
                                                upload.setNull(index + 1, 
typesArray[index]);
                                        } else {
                                                // casting values as suggested 
by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
@@ -133,56 +133,56 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
                                                                break;
                                                        case 
java.sql.Types.BOOLEAN:
                                                        case java.sql.Types.BIT:
-                                                               
upload.setBoolean(index + 1, (boolean) row.productElement(index));
+                                                               
upload.setBoolean(index + 1, (boolean) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.CHAR:
                                                        case 
java.sql.Types.NCHAR:
                                                        case 
java.sql.Types.VARCHAR:
                                                        case 
java.sql.Types.LONGVARCHAR:
                                                        case 
java.sql.Types.LONGNVARCHAR:
-                                                               
upload.setString(index + 1, (String) row.productElement(index));
+                                                               
upload.setString(index + 1, (String) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.TINYINT:
-                                                               
upload.setByte(index + 1, (byte) row.productElement(index));
+                                                               
upload.setByte(index + 1, (byte) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.SMALLINT:
-                                                               
upload.setShort(index + 1, (short) row.productElement(index));
+                                                               
upload.setShort(index + 1, (short) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.INTEGER:
-                                                               
upload.setInt(index + 1, (int) row.productElement(index));
+                                                               
upload.setInt(index + 1, (int) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.BIGINT:
-                                                               
upload.setLong(index + 1, (long) row.productElement(index));
+                                                               
upload.setLong(index + 1, (long) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.REAL:
-                                                               
upload.setFloat(index + 1, (float) row.productElement(index));
+                                                               
upload.setFloat(index + 1, (float) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.FLOAT:
                                                        case 
java.sql.Types.DOUBLE:
-                                                               
upload.setDouble(index + 1, (double) row.productElement(index));
+                                                               
upload.setDouble(index + 1, (double) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.DECIMAL:
                                                        case 
java.sql.Types.NUMERIC:
-                                                               
upload.setBigDecimal(index + 1, (java.math.BigDecimal) 
row.productElement(index));
+                                                               
upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.DATE:
-                                                               
upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+                                                               
upload.setDate(index + 1, (java.sql.Date) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.TIME:
-                                                               
upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+                                                               
upload.setTime(index + 1, (java.sql.Time) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.TIMESTAMP:
-                                                               
upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+                                                               
upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
                                                                break;
                                                        case 
java.sql.Types.BINARY:
                                                        case 
java.sql.Types.VARBINARY:
                                                        case 
java.sql.Types.LONGVARBINARY:
-                                                               
upload.setBytes(index + 1, (byte[]) row.productElement(index));
+                                                               
upload.setBytes(index + 1, (byte[]) row.getField(index));
                                                                break;
                                                        default:
-                                                               
upload.setObject(index + 1, row.productElement(index));
+                                                               
upload.setObject(index + 1, row.getField(index));
                                                                
LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set 
its value: %s.",
-                                                                       
typesArray[index], index + 1, row.productElement(index));
+                                                                       
typesArray[index], index + 1, row.getField(index));
                                                                // case 
java.sql.Types.SQLXML
                                                                // case 
java.sql.Types.ARRAY:
                                                                // case 
java.sql.Types.JAVA_OBJECT:

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index da9469b..88aa4fa 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
 import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index efae076..b08aa3a 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -25,7 +25,7 @@ import java.sql.ResultSet;
 import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
 import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.apache.flink.core.io.InputSplit;
 import org.junit.After;
 import org.junit.Assert;
@@ -116,15 +116,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                break;
                        }
                        
-                       if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                       if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                       if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                       if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                       if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+                       if(next.getField(0)!=null) { Assert.assertEquals("Field 
0 should be int", Integer.class, next.getField(0).getClass());}
+                       if(next.getField(1)!=null) { Assert.assertEquals("Field 
1 should be String", String.class, next.getField(1).getClass());}
+                       if(next.getField(2)!=null) { Assert.assertEquals("Field 
2 should be String", String.class, next.getField(2).getClass());}
+                       if(next.getField(3)!=null) { Assert.assertEquals("Field 
3 should be float", Double.class, next.getField(3).getClass());}
+                       if(next.getField(4)!=null) { Assert.assertEquals("Field 
4 should be int", Integer.class, next.getField(4).getClass());}
 
                        for (int x = 0; x < 5; x++) {
                                if(testData[recordCount][x]!=null) {
-                                       
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+                                       
Assert.assertEquals(testData[recordCount][x], next.getField(x));
                                }
                        }
                        recordCount++;
@@ -162,15 +162,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                if (next == null) {
                                        break;
                                }
-                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+                               if(next.getField(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.getField(0).getClass());}
+                               if(next.getField(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.getField(1).getClass());}
+                               if(next.getField(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.getField(2).getClass());}
+                               if(next.getField(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.getField(3).getClass());}
+                               if(next.getField(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.getField(4).getClass());}
 
                                for (int x = 0; x < 5; x++) {
                                        if(testData[recordCount][x]!=null) {
-                                               
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+                                               
Assert.assertEquals(testData[recordCount][x], next.getField(x));
                                        }
                                }
                                recordCount++;
@@ -208,11 +208,11 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                if (next == null) {
                                        break;
                                }
-                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+                               if(next.getField(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.getField(0).getClass());}
+                               if(next.getField(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.getField(1).getClass());}
+                               if(next.getField(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.getField(2).getClass());}
+                               if(next.getField(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.getField(3).getClass());}
+                               if(next.getField(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.getField(4).getClass());}
 
                                recordCount++;
                        }
@@ -244,4 +244,4 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                Assert.assertEquals(0, recordsCnt);
        }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 086a84c..8de0c34 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -26,7 +26,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
+import org.apache.flink.types.Row;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -137,25 +137,25 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                                for (int i = 0; i < tuple5.getArity(); i++) {
                                        row.setField(i, resultSet.getObject(i + 
1));
                                }
-                               if (row.productElement(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, row.productElement(0).getClass());
+                               if (row.getField(0) != null) {
+                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, row.getField(0).getClass());
                                }
-                               if (row.productElement(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, row.productElement(1).getClass());
+                               if (row.getField(1) != null) {
+                                       Assert.assertEquals("Field 1 should be 
String", String.class, row.getField(1).getClass());
                                }
-                               if (row.productElement(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, row.productElement(2).getClass());
+                               if (row.getField(2) != null) {
+                                       Assert.assertEquals("Field 2 should be 
String", String.class, row.getField(2).getClass());
                                }
-                               if (row.productElement(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, row.productElement(3).getClass());
+                               if (row.getField(3) != null) {
+                                       Assert.assertEquals("Field 3 should be 
float", Double.class, row.getField(3).getClass());
                                }
-                               if (row.productElement(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, row.productElement(4).getClass());
+                               if (row.getField(4) != null) {
+                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, row.getField(4).getClass());
                                }
 
                                for (int x = 0; x < tuple5.getArity(); x++) {
                                        if 
(JDBCTestBase.testData[recordCount][x] != null) {
-                                               
Assert.assertEquals(JDBCTestBase.testData[recordCount][x], 
row.productElement(x));
+                                               
Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x));
                                        }
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 69ad693..ffcb26f 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -25,7 +25,7 @@ import java.sql.Statement;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
new file mode 100644
index 0000000..34233f5
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+
+@PublicEvolving
+public class RowCsvInputFormat extends CsvInputFormat<Row> {
+
+       private static final long serialVersionUID = 1L;
+
+       private int arity;
+       private boolean emptyColumnAsNull;
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String 
lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean 
emptyColumnAsNull) {
+               super(filePath);
+               if (rowTypeInfo.getArity() == 0) {
+                       throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+               }
+               this.arity = rowTypeInfo.getArity();
+
+               boolean[] fieldsMask;
+               if (includedFieldsMask != null) {
+                       fieldsMask = includedFieldsMask;
+               } else {
+                       fieldsMask = createDefaultMask(arity);
+               }
+               this.emptyColumnAsNull = emptyColumnAsNull;
+               setDelimiter(lineDelimiter);
+               setFieldDelimiter(fieldDelimiter);
+               setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo));
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String 
lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) {
+               this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, 
toBoolMask(includedFieldsMask), false);
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String 
lineDelimiter, String fieldDelimiter) {
+               this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, 
null, false);
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+               this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, 
DEFAULT_FIELD_DELIMITER, includedFieldsMask, false);
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+               this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, 
DEFAULT_FIELD_DELIMITER, includedFieldsMask);
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean emptyColumnAsNull) {
+               this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, 
DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull);
+       }
+
+       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+               this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, 
DEFAULT_FIELD_DELIMITER, null);
+       }
+
+       private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) {
+               Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
+               for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+                       classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+               }
+               return classes;
+       }
+
+       private static boolean[] toBoolMask(int[] includedFieldsMask) {
+               if (includedFieldsMask == null) {
+                       return null;
+               } else {
+                       return toBooleanMask(includedFieldsMask);
+               }
+       }
+
+       @Override
+       protected Row fillRecord(Row reuse, Object[] parsedValues) {
+               Row reuseRow;
+               if (reuse == null) {
+                       reuseRow = new Row(arity);
+               } else {
+                       reuseRow = reuse;
+               }
+               for (int i = 0; i < parsedValues.length; i++) {
+                       reuseRow.setField(i, parsedValues[i]);
+               }
+               return reuseRow;
+       }
+
+       @Override
+       protected boolean parseRecord(Object[] holders, byte[] bytes, int 
offset, int numBytes) throws ParseException {
+               byte[] fieldDelimiter = this.getFieldDelimiter();
+               boolean[] fieldIncluded = this.fieldIncluded;
+
+               int startPos = offset;
+               int limit = offset + numBytes;
+
+               int field = 0;
+               int output = 0;
+               while (field < fieldIncluded.length) {
+
+                       // check valid start position
+                       if (startPos >= limit) {
+                               if (isLenient()) {
+                                       return false;
+                               } else {
+                                       throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
+                               }
+                       }
+
+                       if (fieldIncluded[field]) {
+                               // parse field
+                               FieldParser<Object> parser = 
(FieldParser<Object>) this.getFieldParsers()[output];
+                               int latestValidPos = startPos;
+                               startPos = parser.resetErrorStateAndParse(
+                                       bytes,
+                                       startPos,
+                                       limit,
+                                       fieldDelimiter,
+                                       holders[output]);
+
+                               if (!isLenient() && (parser.getErrorState() != 
FieldParser.ParseErrorState.NONE)) {
+                                       // the error state EMPTY_COLUMN is 
ignored
+                                       if (parser.getErrorState() != 
FieldParser.ParseErrorState.EMPTY_COLUMN) {
+                                               throw new 
ParseException(String.format("Parsing error for column %1$s of row '%2$s' 
originated by %3$s: %4$s.",
+                                                       field, new 
String(bytes, offset, numBytes), parser.getClass().getSimpleName(), 
parser.getErrorState()));
+                                       }
+                               }
+                               holders[output] = parser.getLastResult();
+
+                               // check parse result:
+                               // the result is null if it is invalid
+                               // or empty with emptyColumnAsNull enabled
+                               if (startPos < 0 ||
+                                       (emptyColumnAsNull && 
(parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) {
+                                       holders[output] = null;
+                                       startPos = skipFields(bytes, 
latestValidPos, limit, fieldDelimiter);
+                               }
+                               output++;
+                       } else {
+                               // skip field
+                               startPos = skipFields(bytes, startPos, limit, 
fieldDelimiter);
+                       }
+
+                       // check if something went wrong
+                       if (startPos < 0) {
+                               throw new 
ParseException(String.format("Unexpected parser position for column %1$s of row 
'%2$s'",
+                                       field, new String(bytes, offset, 
numBytes)));
+                       }
+
+                       field++;
+               }
+               return true;
+       }
+}

Reply via email to