[FLINK-4247] [table] CsvTableSource.getDataSet() expects Java 
ExecutionEnvironment

This closes #2298.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0975d9f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0975d9f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0975d9f1

Branch: refs/heads/flip-6
Commit: 0975d9f11dc09f8b1ea420d660175874d423cac3
Parents: 5901bf3
Author: twalthr <twal...@apache.org>
Authored: Tue Jul 26 17:27:02 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 20 16:02:37 2016 +0200

----------------------------------------------------------------------
 .../api/table/sources/BatchTableSource.scala    |  7 ++++-
 .../api/table/sources/CsvTableSource.scala      | 28 +++++++++++++-------
 .../api/table/sources/StreamTableSource.scala   |  7 ++++-
 .../connectors/kafka/KafkaTableSource.java      |  4 +++
 4 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
index 7abb03c..74e4cd6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
@@ -26,6 +26,11 @@ import org.apache.flink.api.java.{ExecutionEnvironment, 
DataSet}
   */
 trait BatchTableSource[T] extends TableSource[T] {
 
-  /** Returns the data of the table as a [[DataSet]]. */
+  /**
+    * Returns the data of the table as a [[DataSet]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    *       Do not use it in Table API programs.
+    */
   def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index 40fdf82..9cf4397 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -56,13 +56,13 @@ class CsvTableSource(
   with StreamTableSource[Row] {
 
   /**
-  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with 
a
-  * (logically) unlimited number of fields.
-  *
-  * @param path The path to the CSV file.
-  * @param fieldNames The names of the table fields.
-  * @param fieldTypes The types of the table fields.
-  */
+    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files 
with a
+    * (logically) unlimited number of fields.
+    *
+    * @param path The path to the CSV file.
+    * @param fieldNames The names of the table fields.
+    * @param fieldTypes The types of the table fields.
+    */
   def this(path: String, fieldNames: Array[String], fieldTypes: 
Array[TypeInformation[_]]) =
     this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
       CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
@@ -73,7 +73,12 @@ class CsvTableSource(
 
   private val returnType = new RowTypeInfo(fieldTypes)
 
-  /** Returns the data of the table as a [[DataSet]] of [[Row]]. */
+  /**
+    * Returns the data of the table as a [[DataSet]] of [[Row]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    *       Do not use it in Table API programs.
+    */
   override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
     execEnv.createInput(createCsvInput(), returnType)
   }
@@ -90,7 +95,12 @@ class CsvTableSource(
   /** Returns the [[RowTypeInfo]] for the return type of the 
[[CsvTableSource]]. */
   override def getReturnType: RowTypeInfo = returnType
 
-  /** Returns the data of the table as a [[DataStream]] of [[Row]]. */
+  /**
+    * Returns the data of the table as a [[DataStream]] of [[Row]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    *       Do not use it in Table API programs.
+    */
   override def getDataStream(streamExecEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
     streamExecEnv.createInput(createCsvInput(), returnType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
index c30d9aa..cdae0b3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
@@ -27,6 +27,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
   */
 trait StreamTableSource[T] extends TableSource[T] {
 
-  /** Returns the data of the table as a [[DataStream]]. */
+  /**
+    * Returns the data of the table as a [[DataStream]].
+    *
+    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
+    *       Do not use it in Table API programs.
+    */
   def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0975d9f1/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index fc6bf44..446f203 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -99,6 +99,10 @@ abstract class KafkaTableSource implements 
StreamTableSource<Row> {
                                "Number of provided field names and types does 
not match.");
        }
 
+       /**
+        * NOTE: This method is for internal use only for defining a 
TableSource.
+        *       Do not use it in Table API programs.
+        */
        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
                // Version-specific Kafka consumer

Reply via email to