Repository: flink Updated Branches: refs/heads/release-1.3 c3289c9d9 -> 168378d98
[FLINK-7939] [table] Fix Table conversion of DataStream of AtomicType. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168378d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168378d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168378d9 Branch: refs/heads/release-1.3 Commit: 168378d98ddf591f780a939ee74310ec8d04d517 Parents: c3289c9 Author: Fabian Hueske <[email protected]> Authored: Sat Oct 28 22:13:23 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Wed Nov 1 09:43:33 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 9 +- .../flink/table/plan/schema/FlinkTable.scala | 8 +- .../stream/StreamTableEnvironmentTest.scala | 37 +++++ .../datastream/TimeAttributesITCase.scala | 159 ++++++++++++++----- 4 files changed, 168 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4ceeece..37bcd0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -622,10 +622,13 @@ abstract class TableEnvironment(val config: TableConfig) { "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[_] => exprs.zipWithIndex flatMap { + case (_: TimeAttribute, _) => + None + case (UnresolvedFieldReference(name), idx) if idx > 0 => + // only accept the first field for an atomic type + throw new TableException("Only the first field can reference an atomic type.") case (UnresolvedFieldReference(name), idx) => - if (idx > 0) { - throw new TableException("Table of atomic type can only have a single field.") - } + // first field reference is mapped to atomic type Some((0, name)) case _ => throw new TableException("Field reference expression requested.") } http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index fd992c5..c360a6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -67,9 +67,13 @@ abstract class FlinkTable[T]( } fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) case aType: AtomicType[_] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { + if (fieldIndexes.exists(_ > 0)) { throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") + "Invalid index for table of atomic type encountered. Please report a bug.") + } + if (fieldIndexes.count(_ == 0) > 1) { + throw new TableException( + "Atomic input type may have only be referenced by a single table field.") } Array(aType) } http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala index 3c1668f..3612423 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala @@ -50,6 +50,43 @@ class StreamTableEnvironmentTest extends TableTestBase { } @Test + def testProctimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + // cannot replace an attribute with proctime + util.addTable[String]('s, 'pt.proctime) + } + + @Test + def testReplacingRowtimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + util.addTable[Long]('rt.rowtime) + } + + @Test + def testAppendedRowtimeAttributeWithAtomicInput(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput1(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'rt.rowtime, 'pt.proctime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput2(): Unit = { + val util = streamTestUtil() + util.addTable[String]('s, 'pt.proctime, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttributeWithAtomicInput3(): Unit = { + val util = streamTestUtil() + util.addTable[Long]('rt.rowtime, 'pt.proctime) + } + + @Test def testProctimeAttribute(): Unit = { val util = streamTestUtil() // cannot replace an attribute with proctime http://git-wip-us.apache.org/repos/asf/flink/blob/168378d9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index bb63abb..835bd77 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -38,7 +38,7 @@ import org.apache.flink.table.api.scala.stream.utils.StreamITCase import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException} import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} -import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} +import org.apache.flink.table.runtime.datastream.TimeAttributesITCase._ import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource} import org.apache.flink.types.Row import org.junit.Assert._ @@ -99,6 +99,70 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } @Test + def testAtomicType1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L)) + .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'proctime.proctime) + + val t = table + .where('proctime.cast(Types.LONG) > 0) + .select('rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAtomicType2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(Seq(1L, 2L, 3L, 4L, 7L, 8L, 16L)) + .assignTimestampsAndWatermarks(new AtomicTimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'l, 'rowtime.rowtime, 'proctime.proctime) + + val t = table + .where('proctime.cast(Types.LONG) > 0) + .select('l, 'rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1,1970-01-01 00:00:00.001", + "2,1970-01-01 00:00:00.002", + "3,1970-01-01 00:00:00.003", + "4,1970-01-01 00:00:00.004", + "7,1970-01-01 00:00:00.007", + "8,1970-01-01 00:00:00.008", + "16,1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test def testCalcMaterialization(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) @@ -435,13 +499,29 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } object TimeAttributesITCase { + + class AtomicTimestampWithEqualWatermark + extends AssignerWithPunctuatedWatermarks[Long] { + + override def checkAndGetNextWatermark( + lastElement: Long, + extractedTimestamp: Long): Watermark = { + new Watermark(extractedTimestamp) + } + + override def extractTimestamp( + element: Long, + previousElementTimestamp: Long): Long = { + element + } + } + class TimestampWithEqualWatermark - extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] { + extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] { override def checkAndGetNextWatermark( lastElement: (Long, Int, Double, Float, BigDecimal, String), - extractedTimestamp: Long) - : Watermark = { + extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } @@ -452,13 +532,12 @@ object TimeAttributesITCase { } } -class TimestampWithEqualWatermarkPojo - extends AssignerWithPunctuatedWatermarks[TestPojo] { + class TimestampWithEqualWatermarkPojo + extends AssignerWithPunctuatedWatermarks[TestPojo] { override def checkAndGetNextWatermark( lastElement: TestPojo, - extractedTimestamp: Long) - : Watermark = { + extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } @@ -475,41 +554,41 @@ class TimestampWithEqualWatermarkPojo var b2: String = "skip me" var c: String = _ } -} - -class TestTableSource - extends StreamTableSource[Row] - with DefinedRowtimeAttribute - with DefinedProctimeAttribute { - override def getDataStream(env: JStreamExecEnv): DataStream[Row] = { - - def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong]) - - val rows = Seq( - toRow(1, "A", 1000L), - toRow(2, "B", 2000L), - toRow(3, "C", 3000L), - toRow(4, "D", 4000L), - toRow(5, "E", 5000L), - toRow(6, "F", 6000L) - ) - - env - .fromCollection(rows.asJava).returns(getReturnType) - .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] { - override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long] - }) - } + class TestTableSource + extends StreamTableSource[Row] + with DefinedRowtimeAttribute + with DefinedProctimeAttribute { + + override def getDataStream(env: JStreamExecEnv): DataStream[Row] = { + + def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong]) + + val rows = Seq( + toRow(1, "A", 1000L), + toRow(2, "B", 2000L), + toRow(3, "C", 3000L), + toRow(4, "D", 4000L), + toRow(5, "E", 5000L), + toRow(6, "F", 6000L) + ) + + env + .fromCollection(rows.asJava).returns(getReturnType) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] { + override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long] + }) + } - override def getRowtimeAttribute: String = "rowtime" + override def getRowtimeAttribute: String = "rowtime" - override def getProctimeAttribute: String = "proctime" + override def getProctimeAttribute: String = "proctime" - override def getReturnType: TypeInformation[Row] = { - new RowTypeInfo( - Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]], - Array("a", "b", "c") - ) + override def getReturnType: TypeInformation[Row] = { + new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]], + Array("a", "b", "c") + ) + } } }
