[FLINK-4103] [table] Add CsvTableSource docs and Java accessibility

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/110bba38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/110bba38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/110bba38

Branch: refs/heads/master
Commit: 110bba386a3ac89a8f312cd4d1405571eee9d4b5
Parents: 7e309ee
Author: twalthr <twal...@apache.org>
Authored: Tue Jul 26 10:44:24 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Jul 26 11:03:29 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 83 ++++++++++++++++++++
 .../api/table/sources/CsvTableSource.scala      | 20 ++++-
 2 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/110bba38/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index cf9133c..14439ed 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -252,6 +252,89 @@ tableEnvironment.registerTableSource("kafka-source", 
kafkaTableSource);
 Table result = tableEnvironment.ingest("kafka-source");
 ```
 
+#### CsvTableSource
+
+The `CsvTableSource` is already included in `flink-table` without additional 
dependecies.
+
+It can be configured with the following properties:
+
+ - `path` The path to the CSV file, required.
+ - `fieldNames` The names of the table fields, required.
+ - `fieldTypes` The types of the table fields, required.
+ - `fieldDelim` The field delimiter, `","` by default.
+ - `rowDelim` The row delimiter, `"\n"` by default.
+ - `quoteCharacter` An optional quote character for String values, `null` by 
default.
+ - `ignoreFirstLine` Flag to ignore the first line, `false` by default.
+ - `ignoreComments` An optional prefix to indicate comments, `null` by default.
+ - `lenient` Flag to skip records with parse error instead to fail, `false` by 
default.
+
+You can create the source as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+CsvTableSource csvTableSource = new CsvTableSource(
+    "/path/to/your/file.csv",
+    new String[] { "name", "id", "score", "comments" },
+    new TypeInformation<?>[] {
+      Types.STRING(),
+      Types.INT(),
+      Types.DOUBLE(),
+      Types.STRING()
+    },
+    "#",    // fieldDelim
+    "$",    // rowDelim
+    null,   // quoteCharacter
+    true,   // ignoreFirstLine
+    "%",    // ignoreComments
+    false); // lenient
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val csvTableSource = new CsvTableSource(
+    "/path/to/your/file.csv",
+    Array("name", "id", "score", "comments"),
+    Array(
+      Types.STRING,
+      Types.INT,
+      Types.DOUBLE,
+      Types.STRING
+    ),
+    fieldDelim = "#",
+    rowDelim = "$",
+    ignoreFirstLine = true,
+    ignoreComments = "%")
+{% endhighlight %}
+</div>
+</div>
+
+You can work with the Table as explained in the rest of the Table API guide in 
both stream and batch `TableEnvironment`s:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+tableEnvironment.registerTableSource("mycsv", csvTableSource);
+
+Table streamTable = streamTableEnvironment.ingest("mycsv");
+
+Table batchTable = batchTableEnvironment.scan("mycsv");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+tableEnvironment.registerTableSource("mycsv", csvTableSource)
+
+val streamTable = streamTableEnvironment.ingest("mycsv")
+
+val batchTable = batchTableEnvironment.scan("mycsv")
+{% endhighlight %}
+</div>
+</div>
+
+
 Table API
 ----------
 The Table API provides methods to apply relational operations on DataSets and 
Datastreams both in Scala and Java.

http://git-wip-us.apache.org/repos/asf/flink/blob/110bba38/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index fc3cf7e..40fdf82 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -35,11 +35,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
-  * @param fieldDelim The field delimiter, ',' by default.
-  * @param rowDelim The row delimiter, '\n' by default.
-  * @param quoteCharacter An optional quote character for String values, 
disabled by default.
+  * @param fieldDelim The field delimiter, "," by default.
+  * @param rowDelim The row delimiter, "\n" by default.
+  * @param quoteCharacter An optional quote character for String values, null 
by default.
   * @param ignoreFirstLine Flag to ignore the first line, false by default.
-  * @param ignoreComments An optional prefix to indicate comments, disabled by 
default.
+  * @param ignoreComments An optional prefix to indicate comments, null by 
default.
   * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
   */
 class CsvTableSource(
@@ -55,6 +55,18 @@ class CsvTableSource(
   extends BatchTableSource[Row]
   with StreamTableSource[Row] {
 
+  /**
+  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with 
a
+  * (logically) unlimited number of fields.
+  *
+  * @param path The path to the CSV file.
+  * @param fieldNames The names of the table fields.
+  * @param fieldTypes The types of the table fields.
+  */
+  def this(path: String, fieldNames: Array[String], fieldTypes: 
Array[TypeInformation[_]]) =
+    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
+
   if (fieldNames.length != fieldTypes.length) {
     throw TableException("Number of field names and field types must be 
equal.")
   }

Reply via email to