Repository: flink
Updated Branches:
  refs/heads/master 3a45a796e -> 808e0f9a6


[FLINK-7802] [table] Fix projection of all fields in CsvTableSource.

This closes #4815.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/808e0f9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/808e0f9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/808e0f9a

Branch: refs/heads/master
Commit: 808e0f9a6d258fb404f81a2144c93719732802f0
Parents: 95f8630
Author: godfreyhe <[email protected]>
Authored: Thu Oct 12 19:12:47 2017 +0800
Committer: Fabian Hueske <[email protected]>
Committed: Thu Oct 19 15:08:31 2017 +0200

----------------------------------------------------------------------
 .../flink/table/sources/CsvTableSource.scala    | 12 ++++++++---
 .../flink/table/api/TableSourceTest.scala       | 21 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/808e0f9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 8a458ef..cfc8ada 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -105,8 +105,14 @@ class CsvTableSource(
   /** Returns a copy of [[TableSource]] with ability to project fields */
   override def projectFields(fields: Array[Int]): CsvTableSource = {
 
-    val newFieldNames: Array[String] = fields.map(fieldNames(_))
-    val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_))
+    val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) {
+      (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_)))
+    } else {
+      // reporting number of records only, we must read some columns to get 
row count.
+      // (e.g. SQL: select count(1) from csv_table)
+      // We choose the first column here.
+      (Array(0), Array(fieldNames.head), 
Array[TypeInformation[_]](fieldTypes.head))
+    }
 
     val source = new CsvTableSource(path,
       newFieldNames,
@@ -117,7 +123,7 @@ class CsvTableSource(
       ignoreFirstLine,
       ignoreComments,
       lenient)
-    source.selectedFields = fields
+    source.selectedFields = newFields
     source
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/808e0f9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 486b078..6321e09 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -118,6 +118,27 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
+  def testBatchProjectableSourceFullProjection(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tableEnv = util.tableEnv
+
+    tableEnv.registerTableSource(tableName, tableSource)
+
+    val result = tableEnv
+      .scan(tableName)
+      .select(1)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(tableName, Array("first")),
+      term("select", "1 AS _c0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
   def testBatchFilterableWithoutPushDown(): Unit = {
     val (tableSource, tableName) = filterableTableSource
     val util = batchTestUtil()

Reply via email to