Repository: flink Updated Branches: refs/heads/master 3a45a796e -> 808e0f9a6
[FLINK-7802] [table] Fix projection of all fields in CsvTableSource. This closes #4815. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/808e0f9a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/808e0f9a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/808e0f9a Branch: refs/heads/master Commit: 808e0f9a6d258fb404f81a2144c93719732802f0 Parents: 95f8630 Author: godfreyhe <[email protected]> Authored: Thu Oct 12 19:12:47 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Thu Oct 19 15:08:31 2017 +0200 ---------------------------------------------------------------------- .../flink/table/sources/CsvTableSource.scala | 12 ++++++++--- .../flink/table/api/TableSourceTest.scala | 21 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/808e0f9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 8a458ef..cfc8ada 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -105,8 +105,14 @@ class CsvTableSource( /** Returns a copy of [[TableSource]] with ability to project fields */ override def projectFields(fields: Array[Int]): CsvTableSource = { - val newFieldNames: Array[String] = fields.map(fieldNames(_)) - val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_)) + val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) { + (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_))) + } else { + // reporting number of records only, we must read some columns to get row count. + // (e.g. SQL: select count(1) from csv_table) + // We choose the first column here. + (Array(0), Array(fieldNames.head), Array[TypeInformation[_]](fieldTypes.head)) + } val source = new CsvTableSource(path, newFieldNames, @@ -117,7 +123,7 @@ class CsvTableSource( ignoreFirstLine, ignoreComments, lenient) - source.selectedFields = fields + source.selectedFields = newFields source } http://git-wip-us.apache.org/repos/asf/flink/blob/808e0f9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala index 486b078..6321e09 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala @@ -118,6 +118,27 @@ class TableSourceTest extends TableTestBase { } @Test + def testBatchProjectableSourceFullProjection(): Unit = { + val (tableSource, tableName) = csvTable + val util = batchTestUtil() + val tableEnv = util.tableEnv + + tableEnv.registerTableSource(tableName, tableSource) + + val result = tableEnv + .scan(tableName) + .select(1) + + val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, Array("first")), + term("select", "1 AS _c0") + ) + + util.verifyTable(result, expected) + } + + @Test def testBatchFilterableWithoutPushDown(): Unit = { val (tableSource, tableName) = filterableTableSource val util = batchTestUtil()
