Repository: flink Updated Branches: refs/heads/master 36b663f45 -> 0e92b6632
[FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType. This closes #4917. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/505d478d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/505d478d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/505d478d Branch: refs/heads/master Commit: 505d478d55c93e07a7227e375939eca19ec4d082 Parents: 36b663f Author: Fabian Hueske <[email protected]> Authored: Sat Oct 28 22:13:23 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Tue Oct 31 21:40:33 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 9 +- .../api/stream/StreamTableEnvironmentTest.scala | 37 ++++++++ .../runtime/stream/TimeAttributesITCase.scala | 88 +++++++++++++++++++- 3 files changed, 128 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index fec7f1a..c3cab13 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -810,10 +810,13 @@ abstract class TableEnvironment(val config: TableConfig) { "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[_] => exprs.zipWithIndex flatMap { + case (_: TimeAttribute, _) => + None + case (UnresolvedFieldReference(name), idx) if idx > 0 => + // only accept the first field for an atomic type + throw new TableException("Only the first field can reference an atomic type.") case (UnresolvedFieldReference(name), idx) => - if (idx > 0) { - throw new TableException("Table of atomic type can only have a single field.") - } + // first field reference is mapped to atomic type Some((0, name)) case _ => throw new TableException("Field reference expression requested.") } http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 1b99679..863d07b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -66,6 +66,43 @@ class StreamTableEnvironmentTest extends TableTestBase { } @Test + def testProctimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + // cannot replace an attribute with proctime + util.addTable[String]('s, 'pt.proctime) + } + + @Test + def testReplacingRowtimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + util.addTable[Long]('rt.rowtime) + } + + @Test + def testAppendedRowtimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput1(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'rt.rowtime, 'pt.proctime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput2(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'pt.proctime, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput3(): Unit = { + val util = streamTestUtil() + util.addTable[Long]('rt.rowtime, 'pt.proctime) + } + + @Test def testProctimeAttribute(): Unit = { val util = streamTestUtil() // cannot replace an attribute with proctime http://git-wip-us.apache.org/repos/asf/flink/blob/505d478d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index b7f97f9..5086601 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -34,7 +34,7 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} -import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} +import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} import org.apache.flink.table.runtime.utils.StreamITCase import org.apache.flink.table.utils.TestTableSourceWithTime import org.apache.flink.types.Row @@ -58,6 +58,70 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) @Test + def testAtomicType1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L)) + .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'proctime.proctime) + + val t = table + .where('proctime.cast(Types.LONG) > 0) + .select('rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAtomicType2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L)) + .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'l, 'rowtime.rowtime, 'proctime.proctime) + + val t = table + .where('proctime.cast(Types.LONG) > 0) + .select('l, 'rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1,1970-01-01 00:00:00.001", + "2,1970-01-01 00:00:00.002", + "3,1970-01-01 00:00:00.003", + "4,1970-01-01 00:00:00.004", + "7,1970-01-01 00:00:00.007", + "8,1970-01-01 00:00:00.008", + "16,1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test def testCalcMaterialization(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) @@ -566,8 +630,26 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } object TimeAttributesITCase { + + class AtomicTimestampWithEqualWatermark + extends AssignerWithPunctuatedWatermarks[Long] { + + override def checkAndGetNextWatermark( + lastElement: Long, + extractedTimestamp: Long) + : Watermark = { + new Watermark(extractedTimestamp) + } + + override def extractTimestamp( + element: Long, + previousElementTimestamp: Long): Long = { + element + } + } + class TimestampWithEqualWatermark - extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] { + extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] { override def checkAndGetNextWatermark( lastElement: (Long, Int, Double, Float, BigDecimal, String), @@ -584,7 +666,7 @@ object TimeAttributesITCase { } class TimestampWithEqualWatermarkPojo - extends AssignerWithPunctuatedWatermarks[TestPojo] { + extends AssignerWithPunctuatedWatermarks[TestPojo] { override def checkAndGetNextWatermark( lastElement: TestPojo,
