Repository: flink
Updated Branches:
  refs/heads/release-1.3 cc71dec10 -> 571cda729


[FLINK-7571] Add test for TableSource with time indicators.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/571cda72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/571cda72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/571cda72

Branch: refs/heads/release-1.3
Commit: 571cda729640df714d68aaf3a124e5437e0c5199
Parents: cc71dec
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Sep 21 15:01:53 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Sep 21 15:02:27 2017 +0200

----------------------------------------------------------------------
 .../datastream/TimeAttributesITCase.scala       | 73 +++++++++++++++++++-
 1 file changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/571cda72/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index c434f47..bb63abb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -19,10 +19,16 @@
 package org.apache.flink.table.runtime.datastream
 
 import java.math.BigDecimal
+import java.lang.{Integer => JInt, Long => JLong}
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JStreamExecEnv}
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
@@ -33,11 +39,13 @@ import org.apache.flink.table.api.{TableEnvironment, 
TableException, Types, Vali
 import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import 
org.apache.flink.table.runtime.datastream.TimeAttributesITCase.{TestPojo, 
TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, 
DefinedRowtimeAttribute, StreamTableSource}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Tests for access and materialization of time attributes.
@@ -398,6 +406,32 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
       "1970-01-01 00:00:00.043,And me.,13")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testTableSourceWithTimeIndicators(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerTableSource("testTable", new TestTableSource)
+    StreamITCase.clear
+
+    val result = tEnv
+      .scan("testTable")
+      .where('a % 2 === 1)
+      .select('rowtime, 'a, 'b, 'c)
+      .toAppendStream[Row]
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:01.0,1,A,1000",
+      "1970-01-01 00:00:03.0,3,C,3000",
+      "1970-01-01 00:00:05.0,5,E,5000")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object TimeAttributesITCase {
@@ -418,7 +452,7 @@ object TimeAttributesITCase {
     }
   }
 
-  class TimestampWithEqualWatermarkPojo
+class TimestampWithEqualWatermarkPojo
   extends AssignerWithPunctuatedWatermarks[TestPojo] {
 
     override def checkAndGetNextWatermark(
@@ -442,3 +476,40 @@ object TimeAttributesITCase {
     var c: String = _
   }
 }
+
+class TestTableSource
+  extends StreamTableSource[Row]
+    with DefinedRowtimeAttribute
+    with DefinedProctimeAttribute {
+
+  override def getDataStream(env: JStreamExecEnv): DataStream[Row] = {
+
+    def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, 
l.asInstanceOf[JLong])
+
+    val rows = Seq(
+      toRow(1, "A", 1000L),
+      toRow(2, "B", 2000L),
+      toRow(3, "C", 3000L),
+      toRow(4, "D", 4000L),
+      toRow(5, "E", 5000L),
+      toRow(6, "F", 6000L)
+    )
+
+    env
+      .fromCollection(rows.asJava).returns(getReturnType)
+      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] {
+        override def extractAscendingTimestamp(r: Row): Long = 
r.getField(2).asInstanceOf[Long]
+      })
+  }
+
+  override def getRowtimeAttribute: String = "rowtime"
+
+  override def getProctimeAttribute: String = "proctime"
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.STRING, 
Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
+      Array("a", "b", "c")
+    )
+  }
+}

Reply via email to