Repository: flink Updated Branches: refs/heads/release-1.3 cc71dec10 -> 571cda729
[FLINK-7571] Add test for TableSource with time indicators. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/571cda72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/571cda72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/571cda72 Branch: refs/heads/release-1.3 Commit: 571cda729640df714d68aaf3a124e5437e0c5199 Parents: cc71dec Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Sep 21 15:01:53 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Sep 21 15:02:27 2017 +0200 ---------------------------------------------------------------------- .../datastream/TimeAttributesITCase.scala | 73 +++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/571cda72/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index c434f47..bb63abb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -19,10 +19,16 @@ package org.apache.flink.table.runtime.datastream import java.math.BigDecimal +import java.lang.{Integer => JInt, Long => JLong} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv} import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase @@ -33,11 +39,13 @@ import org.apache.flink.table.api.{TableEnvironment, TableException, Types, Vali import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} +import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource} import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Tests for access and materialization of time attributes. @@ -398,6 +406,32 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { "1970-01-01 00:00:00.043,And me.,13") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testTableSourceWithTimeIndicators(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerTableSource("testTable", new TestTableSource) + StreamITCase.clear + + val result = tEnv + .scan("testTable") + .where('a % 2 === 1) + .select('rowtime, 'a, 'b, 'c) + .toAppendStream[Row] + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:01.0,1,A,1000", + "1970-01-01 00:00:03.0,3,C,3000", + "1970-01-01 00:00:05.0,5,E,5000") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } object TimeAttributesITCase { @@ -418,7 +452,7 @@ object TimeAttributesITCase { } } - class TimestampWithEqualWatermarkPojo +class TimestampWithEqualWatermarkPojo extends AssignerWithPunctuatedWatermarks[TestPojo] { override def checkAndGetNextWatermark( @@ -442,3 +476,40 @@ object TimeAttributesITCase { var c: String = _ } } + +class TestTableSource + extends StreamTableSource[Row] + with DefinedRowtimeAttribute + with DefinedProctimeAttribute { + + override def getDataStream(env: JStreamExecEnv): DataStream[Row] = { + + def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong]) + + val rows = Seq( + toRow(1, "A", 1000L), + toRow(2, "B", 2000L), + toRow(3, "C", 3000L), + toRow(4, "D", 4000L), + toRow(5, "E", 5000L), + toRow(6, "F", 6000L) + ) + + env + .fromCollection(rows.asJava).returns(getReturnType) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] { + override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long] + }) + } + + override def getRowtimeAttribute: String = "rowtime" + + override def getProctimeAttribute: String = "proctime" + + override def getReturnType: TypeInformation[Row] = { + new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]], + Array("a", "b", "c") + ) + } +}