[
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221805#comment-16221805
]
ASF GitHub Bot commented on FLINK-7548:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4894#discussion_r146587166
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
---
@@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster,
RelOptTable, RelTraitSet}
import org.apache.calcite.rel.RelWriter
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable,
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
import scala.collection.JavaConverters._
abstract class PhysicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- val tableSource: TableSource[_])
+ val tableSource: TableSource[_],
+ val selectedFields: Option[Array[Int]])
extends TableScan(cluster, traitSet, table) {
override def deriveRowType(): RelDataType = {
val flinkTypeFactory =
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildLogicalRowType(
- TableEnvironment.getFieldNames(tableSource),
- TableEnvironment.getFieldTypes(tableSource.getReturnType))
+ val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+ case _: StreamTableSourceTable[_] => true
+ case _: BatchTableSourceTable[_] => false
+ case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+ }
+
+ TableSourceUtil.getTableSchema(tableSource, selectedFields,
streamingTable, flinkTypeFactory)
--- End diff --
I would rename this method to `getRelDataType`. Because it does not return
our `TableSchema`.
> Support watermark generation for TableSource
> --------------------------------------------
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Fabian Hueske
> Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define
> rowtime field, but not support to extract watermarks from the rowtime field.
> We can provide a new interface called {{DefinedWatermark}}, which has two
> methods {{getRowtimeAttribute}} (can only be an existing field) and
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in
> strategies needs further discussion.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)