[FLINK-4247] [table] CsvTableSource.getDataSet() expects Java ExecutionEnvironment
This closes #2298. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0975d9f1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0975d9f1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0975d9f1 Branch: refs/heads/flip-6 Commit: 0975d9f11dc09f8b1ea420d660175874d423cac3 Parents: 5901bf3 Author: twalthr <twal...@apache.org> Authored: Tue Jul 26 17:27:02 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Sep 20 16:02:37 2016 +0200 ---------------------------------------------------------------------- .../api/table/sources/BatchTableSource.scala | 7 ++++- .../api/table/sources/CsvTableSource.scala | 28 +++++++++++++------- .../api/table/sources/StreamTableSource.scala | 7 ++++- .../connectors/kafka/KafkaTableSource.java | 4 +++ 4 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala index 7abb03c..74e4cd6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala @@ -26,6 +26,11 @@ import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} */ trait BatchTableSource[T] extends TableSource[T] { - /** Returns the data of the table as a [[DataSet]]. */ + /** + * Returns the data of the table as a [[DataSet]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] } http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala index 40fdf82..9cf4397 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala @@ -56,13 +56,13 @@ class CsvTableSource( with StreamTableSource[Row] { /** - * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a - * (logically) unlimited number of fields. - * - * @param path The path to the CSV file. - * @param fieldNames The names of the table fields. - * @param fieldTypes The types of the table fields. - */ + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + */ def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) @@ -73,7 +73,12 @@ class CsvTableSource( private val returnType = new RowTypeInfo(fieldTypes) - /** Returns the data of the table as a [[DataSet]] of [[Row]]. */ + /** + * Returns the data of the table as a [[DataSet]] of [[Row]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType) } @@ -90,7 +95,12 @@ class CsvTableSource( /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType - /** Returns the data of the table as a [[DataStream]] of [[Row]]. */ + /** + * Returns the data of the table as a [[DataStream]] of [[Row]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { streamExecEnv.createInput(createCsvInput(), returnType) } http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala index c30d9aa..cdae0b3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala @@ -27,6 +27,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment */ trait StreamTableSource[T] extends TableSource[T] { - /** Returns the data of the table as a [[DataStream]]. */ + /** + * Returns the data of the table as a [[DataStream]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] } http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index fc6bf44..446f203 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -99,6 +99,10 @@ abstract class KafkaTableSource implements StreamTableSource<Row> { "Number of provided field names and types does not match."); } + /** + * NOTE: This method is for internal use only for defining a TableSource. + * Do not use it in Table API programs. + */ @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { // Version-specific Kafka consumer