[FLINK-4103] [table] Add CsvTableSource docs and Java accessibility
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/110bba38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/110bba38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/110bba38 Branch: refs/heads/master Commit: 110bba386a3ac89a8f312cd4d1405571eee9d4b5 Parents: 7e309ee Author: twalthr <twal...@apache.org> Authored: Tue Jul 26 10:44:24 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Jul 26 11:03:29 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 83 ++++++++++++++++++++ .../api/table/sources/CsvTableSource.scala | 20 ++++- 2 files changed, 99 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/110bba38/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index cf9133c..14439ed 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -252,6 +252,89 @@ tableEnvironment.registerTableSource("kafka-source", kafkaTableSource); Table result = tableEnvironment.ingest("kafka-source"); ``` +#### CsvTableSource + +The `CsvTableSource` is already included in `flink-table` without additional dependecies. + +It can be configured with the following properties: + + - `path` The path to the CSV file, required. + - `fieldNames` The names of the table fields, required. + - `fieldTypes` The types of the table fields, required. + - `fieldDelim` The field delimiter, `","` by default. + - `rowDelim` The row delimiter, `"\n"` by default. + - `quoteCharacter` An optional quote character for String values, `null` by default. + - `ignoreFirstLine` Flag to ignore the first line, `false` by default. + - `ignoreComments` An optional prefix to indicate comments, `null` by default. + - `lenient` Flag to skip records with parse error instead to fail, `false` by default. + +You can create the source as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +CsvTableSource csvTableSource = new CsvTableSource( + "/path/to/your/file.csv", + new String[] { "name", "id", "score", "comments" }, + new TypeInformation<?>[] { + Types.STRING(), + Types.INT(), + Types.DOUBLE(), + Types.STRING() + }, + "#", // fieldDelim + "$", // rowDelim + null, // quoteCharacter + true, // ignoreFirstLine + "%", // ignoreComments + false); // lenient +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val csvTableSource = new CsvTableSource( + "/path/to/your/file.csv", + Array("name", "id", "score", "comments"), + Array( + Types.STRING, + Types.INT, + Types.DOUBLE, + Types.STRING + ), + fieldDelim = "#", + rowDelim = "$", + ignoreFirstLine = true, + ignoreComments = "%") +{% endhighlight %} +</div> +</div> + +You can work with the Table as explained in the rest of the Table API guide in both stream and batch `TableEnvironment`s: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +tableEnvironment.registerTableSource("mycsv", csvTableSource); + +Table streamTable = streamTableEnvironment.ingest("mycsv"); + +Table batchTable = batchTableEnvironment.scan("mycsv"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +tableEnvironment.registerTableSource("mycsv", csvTableSource) + +val streamTable = streamTableEnvironment.ingest("mycsv") + +val batchTable = batchTableEnvironment.scan("mycsv") +{% endhighlight %} +</div> +</div> + + Table API ---------- The Table API provides methods to apply relational operations on DataSets and Datastreams both in Scala and Java. http://git-wip-us.apache.org/repos/asf/flink/blob/110bba38/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala index fc3cf7e..40fdf82 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala @@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. - * @param fieldDelim The field delimiter, ',' by default. - * @param rowDelim The row delimiter, '\n' by default. - * @param quoteCharacter An optional quote character for String values, disabled by default. + * @param fieldDelim The field delimiter, "," by default. + * @param rowDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. * @param ignoreFirstLine Flag to ignore the first line, false by default. - * @param ignoreComments An optional prefix to indicate comments, disabled by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ class CsvTableSource( @@ -55,6 +55,18 @@ class CsvTableSource( extends BatchTableSource[Row] with StreamTableSource[Row] { + /** + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + */ + def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = + this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, + CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) + if (fieldNames.length != fieldTypes.length) { throw TableException("Number of field names and field types must be equal.") }