Repository: flink Updated Branches: refs/heads/master 6886f638d -> 2a4ac6600
[FLINK-7571] [table] Fix translation of TableSource with time indicators. This closes #4635. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a4ac660 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a4ac660 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a4ac660 Branch: refs/heads/master Commit: 2a4ac66009f276c157fd35710006b2ebaf9bf764 Parents: 6c315be Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Sep 1 21:41:21 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Sep 21 14:11:42 2017 +0200 ---------------------------------------------------------------------- .../datastream/StreamTableSourceScan.scala | 5 +- .../plan/schema/StreamTableSourceTable.scala | 108 +++++++++++++++---- .../table/plan/schema/TableSourceTable.scala | 22 +++- .../runtime/stream/TimeAttributesITCase.scala | 71 ++++++++++++ 4 files changed, 178 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 663b276..c7a423e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan -import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable} import org.apache.flink.table.sources._ -import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo @@ -108,7 +107,7 @@ class StreamTableSourceScan( convertToInternalRow( new RowSchema(getRowType), inputDataStream, - new TableSourceTable(tableSource), + new StreamTableSourceTable(tableSource), config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala index dc1f31a..5553797 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{TableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo class StreamTableSourceTable[T]( override val tableSource: TableSource[T], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable[T](tableSource, statistic) { - + extends TableSourceTable[T]( + tableSource, + StreamTableSourceTable.adjustFieldIndexes(tableSource), + StreamTableSourceTable.adjustFieldNames(tableSource), + statistic) { override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource) + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType( + this.fieldNames, + fieldTypes) + } - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList +} - val fields = fieldNames.zip(fieldTypes) +object StreamTableSourceTable { + + private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = { + val (rowtime, proctime) = getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldIndices(tableSource) + + // append rowtime marker + val withRowtime = if (rowtime.isDefined) { + original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER + } else { + original + } - val withRowtime = tableSource match { + // append proctime marker + if (proctime.isDefined) { + withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_MARKER + } else { + withRowtime + } + } + + private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = { + val (rowtime, proctime) = getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldNames(tableSource) + + // append rowtime field + val withRowtime = if (rowtime.isDefined) { + original :+ rowtime.get + } else { + original + } + + // append proctime field + if (proctime.isDefined) { + withRowtime :+ proctime.get + } else { + withRowtime + } + } + + private def adjustFieldTypes(tableSource: TableSource[_]): Array[TypeInformation[_]] = { + val (rowtime, proctime) = StreamTableSourceTable.getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldTypes(tableSource.getReturnType) + + // append rowtime type + val withRowtime = if (rowtime.isDefined) { + original :+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR + } else { + original + } + + // append proctime type + val withProctime = if (proctime.isDefined) { + withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR + } else { + withRowtime + } + + withProctime.asInstanceOf[Array[TypeInformation[_]]] + } + + private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], Option[String]) = { + + val rowtime: Option[String] = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null => - fields + None case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") => throw TableException("The name of the rowtime attribute must not be empty.") case timeSource: DefinedRowtimeAttribute => val rowtimeAttribute = timeSource.getRowtimeAttribute - fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) + Some(rowtimeAttribute) case _ => - fields + None } - val withProctime = tableSource match { + val proctime: Option[String] = tableSource match { case timeSource : DefinedProctimeAttribute if timeSource.getProctimeAttribute == null => - withRowtime + None case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute.trim.equals("") => throw TableException("The name of the rowtime attribute must not be empty.") case timeSource: DefinedProctimeAttribute => val proctimeAttribute = timeSource.getProctimeAttribute - withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) + Some(proctimeAttribute) case _ => - withRowtime + None } - - val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip - - flinkTypeFactory.buildLogicalRowType( - fieldNamesWithIndicators, - fieldTypesWithIndicators) - + (rowtime, proctime) } - } http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index a3851e3..2f0ba1a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -25,9 +25,23 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], - override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + fieldIndexes: Array[Int], + fieldNames: Array[String], + override val statistic: FlinkStatistic) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, - fieldIndexes = TableEnvironment.getFieldIndices(tableSource), - fieldNames = TableEnvironment.getFieldNames(tableSource), - statistic) + fieldIndexes, + fieldNames, + statistic) { + + def this( + tableSource: TableSource[T], + statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) { + + this( + tableSource, + TableEnvironment.getFieldIndices(tableSource), + TableEnvironment.getFieldNames(tableSource), + statistic) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a4ac660/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index ec65cf7..7b8b9e6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -19,10 +19,16 @@ package org.apache.flink.table.runtime.stream import java.math.BigDecimal +import java.lang.{Long => JLong, Integer => JInt} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv} import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase @@ -33,11 +39,13 @@ import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} import org.apache.flink.table.runtime.utils.StreamITCase +import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource} import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Tests for access and materialization of time attributes. @@ -369,6 +377,32 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { "1970-01-01 00:00:00.043,And me.,13") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testTableSourceWithTimeIndicators(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerTableSource("testTable", new TestTableSource) + StreamITCase.clear + + val result = tEnv + .scan("testTable") + .where('a % 2 === 1) + .select('rowtime, 'a, 'b, 'c) + .toAppendStream[Row] + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:01.0,1,A,1000", + "1970-01-01 00:00:03.0,3,C,3000", + "1970-01-01 00:00:05.0,5,E,5000") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } object TimeAttributesITCase { @@ -413,3 +447,40 @@ object TimeAttributesITCase { var c: String = _ } } + +class TestTableSource + extends StreamTableSource[Row] + with DefinedRowtimeAttribute + with DefinedProctimeAttribute { + + override def getDataStream(env: JStreamExecEnv): DataStream[Row] = { + + def toRow(i: Int, s: String, l: Long) = Row.of(i.asInstanceOf[JInt], s, l.asInstanceOf[JLong]) + + val rows = Seq( + toRow(1, "A", 1000L), + toRow(2, "B", 2000L), + toRow(3, "C", 3000L), + toRow(4, "D", 4000L), + toRow(5, "E", 5000L), + toRow(6, "F", 6000L) + ) + + env + .fromCollection(rows.asJava).returns(getReturnType) + .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Row] { + override def extractAscendingTimestamp(r: Row): Long = r.getField(2).asInstanceOf[Long] + }) + } + + override def getRowtimeAttribute: String = "rowtime" + + override def getProctimeAttribute: String = "proctime" + + override def getReturnType: TypeInformation[Row] = { + new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]], + Array("a", "b", "c") + ) + } +}