[FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.
- Port RowCsvInputFormat to Java and move it to flink-core. This closes #3003. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d27f8f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d27f8f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d27f8f2 Branch: refs/heads/master Commit: 4d27f8f2deef9fad845ebc91cef121cf9b35f825 Parents: a9e6ec8 Author: tonycox <[email protected]> Authored: Fri Dec 9 21:41:36 2016 +0400 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 15 11:36:40 2016 +0100 ---------------------------------------------------------------------- .../kafka/Kafka010JsonTableSource.java | 2 +- .../connectors/kafka/Kafka010TableSource.java | 2 +- .../connectors/kafka/Kafka08JsonTableSink.java | 2 +- .../kafka/Kafka08JsonTableSource.java | 2 +- .../connectors/kafka/Kafka08TableSource.java | 2 +- .../kafka/Kafka08JsonTableSinkTest.java | 2 +- .../kafka/Kafka08JsonTableSourceTest.java | 2 +- .../connectors/kafka/Kafka09JsonTableSink.java | 2 +- .../kafka/Kafka09JsonTableSource.java | 2 +- .../connectors/kafka/Kafka09TableSource.java | 2 +- .../kafka/Kafka09JsonTableSinkTest.java | 2 +- .../kafka/Kafka09JsonTableSourceTest.java | 2 +- .../connectors/kafka/KafkaJsonTableSink.java | 2 +- .../connectors/kafka/KafkaTableSink.java | 4 +- .../connectors/kafka/KafkaTableSource.java | 4 +- .../JsonRowDeserializationSchema.java | 4 +- .../JsonRowSerializationSchema.java | 8 +- .../kafka/JsonRowDeserializationSchemaTest.java | 14 +- .../kafka/JsonRowSerializationSchemaTest.java | 8 +- .../kafka/KafkaTableSinkTestBase.java | 4 +- .../kafka/KafkaTableSourceTestBase.java | 2 +- .../flink/api/java/io/jdbc/JDBCInputFormat.java | 6 +- .../api/java/io/jdbc/JDBCOutputFormat.java | 44 +- .../flink/api/java/io/jdbc/JDBCFullTest.java | 2 +- .../api/java/io/jdbc/JDBCInputFormatTest.java | 38 +- .../api/java/io/jdbc/JDBCOutputFormatTest.java | 24 +- .../flink/api/java/io/jdbc/JDBCTestBase.java | 2 +- .../flink/api/java/io/RowCsvInputFormat.java | 174 ++++ .../api/java/io/RowCsvInputFormatTest.java | 879 +++++++++++++++++++ .../api/java/table/BatchTableEnvironment.scala | 4 +- .../api/java/table/StreamTableEnvironment.scala | 4 +- .../api/scala/table/BatchTableEnvironment.scala | 2 +- .../scala/table/StreamTableEnvironment.scala | 2 +- .../apache/flink/api/scala/table/package.scala | 3 +- .../flink/api/table/BatchTableEnvironment.scala | 3 +- .../api/table/StreamTableEnvironment.scala | 1 + .../flink/api/table/TableEnvironment.scala | 7 +- .../flink/api/table/codegen/CodeGenUtils.scala | 3 +- .../flink/api/table/codegen/CodeGenerator.scala | 5 +- .../api/table/codegen/ExpressionReducer.scala | 10 +- .../plan/nodes/dataset/DataSetAggregate.scala | 8 +- .../nodes/datastream/DataStreamAggregate.scala | 8 +- .../DataSetAggregateWithNullValuesRule.scala | 1 + .../table/plan/schema/TableSourceTable.scala | 6 +- .../api/table/runtime/aggregate/Aggregate.scala | 2 +- .../AggregateAllTimeWindowFunction.scala | 2 +- .../aggregate/AggregateAllWindowFunction.scala | 2 +- .../aggregate/AggregateMapFunction.scala | 6 +- .../AggregateReduceCombineFunction.scala | 4 +- .../AggregateReduceGroupFunction.scala | 4 +- .../aggregate/AggregateTimeWindowFunction.scala | 2 +- .../table/runtime/aggregate/AggregateUtil.scala | 7 +- .../aggregate/AggregateWindowFunction.scala | 2 +- .../table/runtime/aggregate/AvgAggregate.scala | 62 +- .../runtime/aggregate/CountAggregate.scala | 8 +- ...rementalAggregateAllTimeWindowFunction.scala | 2 +- .../IncrementalAggregateAllWindowFunction.scala | 4 +- .../IncrementalAggregateReduceFunction.scala | 4 +- ...IncrementalAggregateTimeWindowFunction.scala | 2 +- .../IncrementalAggregateWindowFunction.scala | 4 +- .../table/runtime/aggregate/MaxAggregate.scala | 14 +- .../table/runtime/aggregate/MinAggregate.scala | 14 +- .../table/runtime/aggregate/SumAggregate.scala | 14 +- .../aggregate/TimeWindowPropertyCollector.scala | 4 +- .../flink/api/table/sinks/CsvTableSink.scala | 12 +- .../api/table/sources/CsvTableSource.scala | 9 +- .../api/table/typeutils/TypeConverter.scala | 11 +- .../api/java/batch/TableEnvironmentITCase.java | 2 +- .../flink/api/java/batch/TableSourceITCase.java | 4 +- .../flink/api/java/batch/sql/SqlITCase.java | 2 +- .../java/batch/table/AggregationsITCase.java | 2 +- .../flink/api/java/batch/table/CalcITCase.java | 2 +- .../api/java/batch/table/CastingITCase.java | 2 +- .../flink/api/java/batch/table/JoinITCase.java | 2 +- .../flink/api/java/stream/sql/SqlITCase.java | 2 +- .../batch/ProjectableTableSourceITCase.scala | 7 +- .../scala/batch/TableEnvironmentITCase.scala | 3 +- .../api/scala/batch/TableSourceITCase.scala | 9 +- .../scala/batch/sql/AggregationsITCase.scala | 3 +- .../flink/api/scala/batch/sql/CalcITCase.scala | 3 +- .../flink/api/scala/batch/sql/JoinITCase.scala | 3 +- .../scala/batch/sql/SetOperatorsITCase.scala | 3 +- .../flink/api/scala/batch/sql/SortITCase.scala | 32 +- .../scala/batch/sql/TableWithSQLITCase.scala | 3 +- .../scala/batch/table/AggregationsITCase.scala | 3 +- .../api/scala/batch/table/CalcITCase.scala | 3 +- .../api/scala/batch/table/JoinITCase.scala | 3 +- .../scala/batch/table/SetOperatorsITCase.scala | 3 +- .../api/scala/batch/table/SortITCase.scala | 49 +- .../table/UserDefinedTableFunctionTest.scala | 7 +- .../api/scala/stream/TableSourceITCase.scala | 7 +- .../flink/api/scala/stream/sql/SqlITCase.scala | 3 +- .../scala/stream/table/AggregationsITCase.scala | 3 +- .../api/scala/stream/table/CalcITCase.scala | 3 +- .../api/scala/stream/table/UnionITCase.scala | 3 +- .../table/UserDefinedTableFunctionTest.scala | 8 +- .../api/scala/stream/utils/StreamITCase.scala | 2 +- .../api/table/expressions/ArrayTypeTest.scala | 9 +- .../table/expressions/CompositeAccessTest.scala | 9 +- .../api/table/expressions/DecimalTypeTest.scala | 9 +- .../expressions/NonDeterministicTests.scala | 6 +- .../table/expressions/ScalarFunctionsTest.scala | 9 +- .../table/expressions/ScalarOperatorsTest.scala | 9 +- .../table/expressions/SqlExpressionTest.scala | 6 +- .../table/expressions/TemporalTypesTest.scala | 9 +- .../UserDefinedScalarFunctionTest.scala | 9 +- .../expressions/utils/ExpressionTestBase.scala | 7 +- .../runtime/aggregate/AggregateTestBase.scala | 2 +- .../dataset/DataSetCorrelateITCase.scala | 3 +- .../datastream/DataStreamCorrelateITCase.scala | 3 +- .../table/utils/UserDefinedTableFunctions.scala | 8 +- 111 files changed, 1457 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java index ddf1ad3..920d718 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java index 732440b..127dafc 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index b155576..839388f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java index 63bb57e..6c7d727 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java index 8f51237..0e3791c 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 6d0b140..0ac452e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java index a2d66ac..f9ef2ce 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import java.util.Properties; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 38ea47c..edbebd0 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java index 975ef58..dfcba5f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java index 03b5040..f423003 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 45f70ac..df84a0f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java index 4a75f50..10b9acc 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import java.util.Properties; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index ee98783..27c4de7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 714d9cd..6c42943 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sinks.StreamTableSink; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index fd423d7..498e918 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index 4344810..b4b3341 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -22,8 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.util.Preconditions; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 077ff13..1998aa6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.util.serialization; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -49,15 +49,15 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { @Override public byte[] serialize(Row row) { - if (row.productArity() != fieldNames.length) { + if (row.getArity() != fieldNames.length) { throw new IllegalStateException(String.format( "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); } ObjectNode objectNode = mapper.createObjectNode(); - for (int i = 0; i < row.productArity(); i++) { - JsonNode node = mapper.valueToTree(row.productElement(i)); + for (int i = 0; i < row.getArity(); i++) { + JsonNode node = mapper.valueToTree(row.getField(i)); objectNode.set(fieldNames[i], node); } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java index 68225e2..88f62f0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.junit.Test; @@ -61,10 +61,10 @@ public class JsonRowDeserializationSchemaTest { Row deserialized = deserializationSchema.deserialize(serializedJson); - assertEquals(3, deserialized.productArity()); - assertEquals(id, deserialized.productElement(0)); - assertEquals(name, deserialized.productElement(1)); - assertArrayEquals(bytes, (byte[]) deserialized.productElement(2)); + assertEquals(3, deserialized.getArity()); + assertEquals(id, deserialized.getField(0)); + assertEquals(name, deserialized.getField(1)); + assertArrayEquals(bytes, (byte[]) deserialized.getField(2)); } /** @@ -85,8 +85,8 @@ public class JsonRowDeserializationSchemaTest { Row row = deserializationSchema.deserialize(serializedJson); - assertEquals(1, row.productArity()); - assertNull("Missing field not null", row.productElement(0)); + assertEquals(1, row.getArity()); + assertNull("Missing field not null", row.getField(0)); deserializationSchema.setFailOnMissingField(true); http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 92af15d..78dedf4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -16,7 +16,7 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.junit.Test; @@ -88,10 +88,10 @@ public class JsonRowSerializationSchemaTest { private void assertEqualRows(Row expectedRow, Row resultRow) { assertEquals("Deserialized row should have expected number of fields", - expectedRow.productArity(), resultRow.productArity()); - for (int i = 0; i < expectedRow.productArity(); i++) { + expectedRow.getArity(), resultRow.getArity()); + for (int i = 0; i < expectedRow.getArity(); i++) { assertEquals(String.format("Field number %d should be as in the original row", i), - expectedRow.productElement(i), resultRow.productElement(i)); + expectedRow.getField(i), resultRow.getField(i)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index ae0af52..cc1c166 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java index 2a281e8..ad51993 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.connectors.kafka; import java.util.Properties; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index b4246f5..3153f96 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -37,8 +37,8 @@ import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; @@ -276,7 +276,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements if (!hasNext) { return null; } - for (int pos = 0; pos < row.productArity(); pos++) { + for (int pos = 0; pos < row.getArity(); pos++) { row.setField(pos, resultSet.getObject(pos + 1)); } //update hasNext after we've read the record http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index da4b1ad..c5585e2 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -26,7 +26,7 @@ import java.sql.SQLException; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,22 +108,22 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> { @Override public void writeRecord(Row row) throws IOException { - if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) { + if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } try { if (typesArray == null ) { // no types provided - for (int index = 0; index < row.productArity(); index++) { - LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index)); - upload.setObject(index + 1, row.productElement(index)); + for (int index = 0; index < row.getArity(); index++) { + LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.getField(index)); + upload.setObject(index + 1, row.getField(index)); } } else { // types provided - for (int index = 0; index < row.productArity(); index++) { + for (int index = 0; index < row.getArity(); index++) { - if (row.productElement(index) == null) { + if (row.getField(index) == null) { upload.setNull(index + 1, typesArray[index]); } else { // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html @@ -133,56 +133,56 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> { break; case java.sql.Types.BOOLEAN: case java.sql.Types.BIT: - upload.setBoolean(index + 1, (boolean) row.productElement(index)); + upload.setBoolean(index + 1, (boolean) row.getField(index)); break; case java.sql.Types.CHAR: case java.sql.Types.NCHAR: case java.sql.Types.VARCHAR: case java.sql.Types.LONGVARCHAR: case java.sql.Types.LONGNVARCHAR: - upload.setString(index + 1, (String) row.productElement(index)); + upload.setString(index + 1, (String) row.getField(index)); break; case java.sql.Types.TINYINT: - upload.setByte(index + 1, (byte) row.productElement(index)); + upload.setByte(index + 1, (byte) row.getField(index)); break; case java.sql.Types.SMALLINT: - upload.setShort(index + 1, (short) row.productElement(index)); + upload.setShort(index + 1, (short) row.getField(index)); break; case java.sql.Types.INTEGER: - upload.setInt(index + 1, (int) row.productElement(index)); + upload.setInt(index + 1, (int) row.getField(index)); break; case java.sql.Types.BIGINT: - upload.setLong(index + 1, (long) row.productElement(index)); + upload.setLong(index + 1, (long) row.getField(index)); break; case java.sql.Types.REAL: - upload.setFloat(index + 1, (float) row.productElement(index)); + upload.setFloat(index + 1, (float) row.getField(index)); break; case java.sql.Types.FLOAT: case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) row.productElement(index)); + upload.setDouble(index + 1, (double) row.getField(index)); break; case java.sql.Types.DECIMAL: case java.sql.Types.NUMERIC: - upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index)); + upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index)); break; case java.sql.Types.DATE: - upload.setDate(index + 1, (java.sql.Date) row.productElement(index)); + upload.setDate(index + 1, (java.sql.Date) row.getField(index)); break; case java.sql.Types.TIME: - upload.setTime(index + 1, (java.sql.Time) row.productElement(index)); + upload.setTime(index + 1, (java.sql.Time) row.getField(index)); break; case java.sql.Types.TIMESTAMP: - upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index)); + upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index)); break; case java.sql.Types.BINARY: case java.sql.Types.VARBINARY: case java.sql.Types.LONGVARBINARY: - upload.setBytes(index + 1, (byte[]) row.productElement(index)); + upload.setBytes(index + 1, (byte[]) row.getField(index)); break; default: - upload.setObject(index + 1, row.productElement(index)); + upload.setObject(index + 1, row.getField(index)); LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.", - typesArray[index], index + 1, row.productElement(index)); + typesArray[index], index + 1, row.getField(index)); // case java.sql.Types.SQLXML // case java.sql.Types.ARRAY: // case java.sql.Types.JAVA_OBJECT: http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java index da9469b..88aa4fa 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index efae076..b08aa3a 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -25,7 +25,7 @@ import java.sql.ResultSet; import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.core.io.InputSplit; import org.junit.After; import org.junit.Assert; @@ -116,15 +116,15 @@ public class JDBCInputFormatTest extends JDBCTestBase { break; } - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} for (int x = 0; x < 5; x++) { if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + Assert.assertEquals(testData[recordCount][x], next.getField(x)); } } recordCount++; @@ -162,15 +162,15 @@ public class JDBCInputFormatTest extends JDBCTestBase { if (next == null) { break; } - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} for (int x = 0; x < 5; x++) { if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + Assert.assertEquals(testData[recordCount][x], next.getField(x)); } } recordCount++; @@ -208,11 +208,11 @@ public class JDBCInputFormatTest extends JDBCTestBase { if (next == null) { break; } - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + if(next.getField(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass());} + if(next.getField(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass());} + if(next.getField(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass());} + if(next.getField(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass());} + if(next.getField(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass());} recordCount++; } @@ -244,4 +244,4 @@ public class JDBCInputFormatTest extends JDBCTestBase { Assert.assertEquals(0, recordsCnt); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java index 086a84c..8de0c34 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -26,7 +26,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -137,25 +137,25 @@ public class JDBCOutputFormatTest extends JDBCTestBase { for (int i = 0; i < tuple5.getArity(); i++) { row.setField(i, resultSet.getObject(i + 1)); } - if (row.productElement(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass()); + if (row.getField(0) != null) { + Assert.assertEquals("Field 0 should be int", Integer.class, row.getField(0).getClass()); } - if (row.productElement(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass()); + if (row.getField(1) != null) { + Assert.assertEquals("Field 1 should be String", String.class, row.getField(1).getClass()); } - if (row.productElement(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass()); + if (row.getField(2) != null) { + Assert.assertEquals("Field 2 should be String", String.class, row.getField(2).getClass()); } - if (row.productElement(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass()); + if (row.getField(3) != null) { + Assert.assertEquals("Field 3 should be float", Double.class, row.getField(3).getClass()); } - if (row.productElement(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass()); + if (row.getField(4) != null) { + Assert.assertEquals("Field 4 should be int", Integer.class, row.getField(4).getClass()); } for (int x = 0; x < tuple5.getArity(); x++) { if (JDBCTestBase.testData[recordCount][x] != null) { - Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x)); + Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java index 69ad693..ffcb26f 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -25,7 +25,7 @@ import java.sql.Statement; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java new file mode 100644 index 0000000..34233f5 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; +import org.apache.flink.types.parser.FieldParser; + +@PublicEvolving +public class RowCsvInputFormat extends CsvInputFormat<Row> { + + private static final long serialVersionUID = 1L; + + private int arity; + private boolean emptyColumnAsNull; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, boolean[] includedFieldsMask, boolean emptyColumnAsNull) { + super(filePath); + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + this.arity = rowTypeInfo.getArity(); + + boolean[] fieldsMask; + if (includedFieldsMask != null) { + fieldsMask = includedFieldsMask; + } else { + fieldsMask = createDefaultMask(arity); + } + this.emptyColumnAsNull = emptyColumnAsNull; + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo)); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter, int[] includedFieldsMask) { + this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, toBoolMask(includedFieldsMask), false); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, String lineDelimiter, String fieldDelimiter) { + this(filePath, rowTypeInfo, lineDelimiter, fieldDelimiter, null, false); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask, false); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean emptyColumnAsNull) { + this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null, emptyColumnAsNull); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, rowTypeInfo, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, null); + } + + private static Class<?>[] extractTypeClasses(RowTypeInfo rowTypeInfo) { + Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()]; + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + return classes; + } + + private static boolean[] toBoolMask(int[] includedFieldsMask) { + if (includedFieldsMask == null) { + return null; + } else { + return toBooleanMask(includedFieldsMask); + } + } + + @Override + protected Row fillRecord(Row reuse, Object[] parsedValues) { + Row reuseRow; + if (reuse == null) { + reuseRow = new Row(arity); + } else { + reuseRow = reuse; + } + for (int i = 0; i < parsedValues.length; i++) { + reuseRow.setField(i, parsedValues[i]); + } + return reuseRow; + } + + @Override + protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException { + byte[] fieldDelimiter = this.getFieldDelimiter(); + boolean[] fieldIncluded = this.fieldIncluded; + + int startPos = offset; + int limit = offset + numBytes; + + int field = 0; + int output = 0; + while (field < fieldIncluded.length) { + + // check valid start position + if (startPos >= limit) { + if (isLenient()) { + return false; + } else { + throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); + } + } + + if (fieldIncluded[field]) { + // parse field + FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output]; + int latestValidPos = startPos; + startPos = parser.resetErrorStateAndParse( + bytes, + startPos, + limit, + fieldDelimiter, + holders[output]); + + if (!isLenient() && (parser.getErrorState() != FieldParser.ParseErrorState.NONE)) { + // the error state EMPTY_COLUMN is ignored + if (parser.getErrorState() != FieldParser.ParseErrorState.EMPTY_COLUMN) { + throw new ParseException(String.format("Parsing error for column %1$s of row '%2$s' originated by %3$s: %4$s.", + field, new String(bytes, offset, numBytes), parser.getClass().getSimpleName(), parser.getErrorState())); + } + } + holders[output] = parser.getLastResult(); + + // check parse result: + // the result is null if it is invalid + // or empty with emptyColumnAsNull enabled + if (startPos < 0 || + (emptyColumnAsNull && (parser.getErrorState().equals(FieldParser.ParseErrorState.EMPTY_COLUMN)))) { + holders[output] = null; + startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter); + } + output++; + } else { + // skip field + startPos = skipFields(bytes, startPos, limit, fieldDelimiter); + } + + // check if something went wrong + if (startPos < 0) { + throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'", + field, new String(bytes, offset, numBytes))); + } + + field++; + } + return true; + } +}
