Repository: flink
Updated Branches:
  refs/heads/master 6886f638d -> 2a4ac6600


[FLINK-7571] [table] Fix translation of TableSource with time indicators.

This closes #4635.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a4ac660
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a4ac660
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a4ac660

Branch: refs/heads/master
Commit: 2a4ac66009f276c157fd35710006b2ebaf9bf764
Parents: 6c315be
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Sep 1 21:41:21 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Sep 21 14:11:42 2017 +0200

----------------------------------------------------------------------
 .../datastream/StreamTableSourceScan.scala      |   5 +-
 .../plan/schema/StreamTableSourceTable.scala    | 108 +++++++++++++++----
 .../table/plan/schema/TableSourceTable.scala    |  22 +++-
 .../runtime/stream/TimeAttributesITCase.scala   |  71 ++++++++++++
 4 files changed, 178 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 663b276..c7a423e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable}
 import org.apache.flink.table.sources._
-import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -108,7 +107,7 @@ class StreamTableSourceScan(
     convertToInternalRow(
       new RowSchema(getRowType),
       inputDataStream,
-      new TableSourceTable(tableSource),
+      new StreamTableSourceTable(tableSource),
       config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index dc1f31a..5553797 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
@@ -28,48 +29,113 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 class StreamTableSourceTable[T](
     override val tableSource: TableSource[T],
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
-  extends TableSourceTable[T](tableSource, statistic) {
-
+  extends TableSourceTable[T](
+    tableSource,
+    StreamTableSourceTable.adjustFieldIndexes(tableSource),
+    StreamTableSourceTable.adjustFieldNames(tableSource),
+    statistic) {
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
+
     val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(
+      this.fieldNames,
+      fieldTypes)
+  }
 
-    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-    val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+}
 
-    val fields = fieldNames.zip(fieldTypes)
+object StreamTableSourceTable {
+
+  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
+    val (rowtime, proctime) = getTimeIndicators(tableSource)
+
+    val original = TableEnvironment.getFieldIndices(tableSource)
+
+    // append rowtime marker
+    val withRowtime = if (rowtime.isDefined) {
+      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
+    } else {
+      original
+    }
 
-    val withRowtime = tableSource match {
+    // append proctime marker
+    if (proctime.isDefined) {
+      withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_MARKER
+    } else {
+      withRowtime
+    }
+  }
+
+  private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = {
+    val (rowtime, proctime) = getTimeIndicators(tableSource)
+
+    val original = TableEnvironment.getFieldNames(tableSource)
+
+    // append rowtime field
+    val withRowtime = if (rowtime.isDefined) {
+      original :+ rowtime.get
+    } else {
+      original
+    }
+
+    // append proctime field
+    if (proctime.isDefined) {
+      withRowtime :+ proctime.get
+    } else {
+      withRowtime
+    }
+  }
+
+  private def adjustFieldTypes(tableSource: TableSource[_]): 
Array[TypeInformation[_]] = {
+    val (rowtime, proctime) = 
StreamTableSourceTable.getTimeIndicators(tableSource)
+
+    val original = TableEnvironment.getFieldTypes(tableSource.getReturnType)
+
+    // append rowtime type
+    val withRowtime = if (rowtime.isDefined) {
+      original :+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+    } else {
+      original
+    }
+
+    // append proctime type
+    val withProctime = if (proctime.isDefined) {
+      withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+    } else {
+      withRowtime
+    }
+
+    withProctime.asInstanceOf[Array[TypeInformation[_]]]
+  }
+
+  private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], 
Option[String]) = {
+
+    val rowtime: Option[String] = tableSource match {
       case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute == null =>
-        fields
+        None
       case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute.trim.equals("") =>
         throw TableException("The name of the rowtime attribute must not be 
empty.")
       case timeSource: DefinedRowtimeAttribute =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
-        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+        Some(rowtimeAttribute)
       case _ =>
-        fields
+        None
     }
 
-    val withProctime = tableSource match {
+    val proctime: Option[String] = tableSource match {
       case timeSource : DefinedProctimeAttribute if 
timeSource.getProctimeAttribute == null =>
-        withRowtime
+        None
       case timeSource: DefinedProctimeAttribute
         if timeSource.getProctimeAttribute.trim.equals("") =>
         throw TableException("The name of the rowtime attribute must not be 
empty.")
       case timeSource: DefinedProctimeAttribute =>
         val proctimeAttribute = timeSource.getProctimeAttribute
-        withRowtime :+ (proctimeAttribute, 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
+        Some(proctimeAttribute)
       case _ =>
-        withRowtime
+        None
     }
-
-    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = 
withProctime.unzip
-
-    flinkTypeFactory.buildLogicalRowType(
-      fieldNamesWithIndicators,
-      fieldTypesWithIndicators)
-
+    (rowtime, proctime)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index a3851e3..2f0ba1a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -25,9 +25,23 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
     val tableSource: TableSource[T],
-    override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+    fieldIndexes: Array[Int],
+    fieldNames: Array[String],
+    override val statistic: FlinkStatistic)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,
-    fieldIndexes = TableEnvironment.getFieldIndices(tableSource),
-    fieldNames = TableEnvironment.getFieldNames(tableSource),
-    statistic)
+    fieldIndexes,
+    fieldNames,
+    statistic) {
+
+  def this(
+    tableSource: TableSource[T],
+    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) {
+
+    this(
+      tableSource,
+      TableEnvironment.getFieldIndices(tableSource),
+      TableEnvironment.getFieldNames(tableSource),
+      statistic)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index ec65cf7..7b8b9e6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -19,10 +19,16 @@
 package org.apache.flink.table.runtime.stream
 
 import java.math.BigDecimal
+import java.lang.{Long => JLong, Integer => JInt}
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JStreamExecEnv}
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
@@ -33,11 +39,13 @@ import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, 
TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.StreamITCase
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, 
DefinedRowtimeAttribute, StreamTableSource}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Tests for access and materialization of time attributes.
@@ -369,6 +377,32 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
       "1970-01-01 00:00:00.043,And me.,13")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testTableSourceWithTimeIndicators(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerTableSource("testTable", new TestTableSource)
+    StreamITCase.clear
+
+    val result = tEnv
+      .scan("testTable")
+      .where('a % 2 === 1)
+      .select('rowtime, 'a, 'b, 'c)
+      .toAppendStream[Row]
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:01.0,1,A,1000",
+      "1970-01-01 00:00:03.0,3,C,3000",
+      "1970-01-01 00:00:05.0,5,E,5000")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object TimeAttributesITCase {
@@ -413,3 +447,40 @@ object TimeAttributesITCase {
     var c: String = _
   }
 }
+
+class TestTableSource
+    extends StreamTableSource[Row]
+    with DefinedRowtimeAttribute
+    with DefinedProctimeAttribute {
+
+  override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {
+
+    def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, 
l.asInstanceOf[JLong])
+
+    val rows = Seq(
+      toRow(1, "A", 1000L),
+      toRow(2, "B", 2000L),
+      toRow(3, "C", 3000L),
+      toRow(4, "D", 4000L),
+      toRow(5, "E", 5000L),
+      toRow(6, "F", 6000L)
+    )
+
+    env
+      .fromCollection(rows.asJava).returns(getReturnType)
+      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
+        override def extractAscendingTimestamp(r: Row): Long = 
r.getField(2).asInstanceOf[Long]
+      })
+  }
+
+  override def getRowtimeAttribute: String = "rowtime"
+
+  override def getProctimeAttribute: String = "proctime"
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.STRING, 
Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
+      Array("a", "b", "c")
+    )
+  }
+}

Reply via email to