Repository: flink Updated Branches: refs/heads/master ca681101f -> d4665a00a
[FLINK-5655] [table] Add event-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL. This closes #3629. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4665a00 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4665a00 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4665a00 Branch: refs/heads/master Commit: d4665a00a4262f89b166895f73a54daab2f25e1c Parents: ca68110 Author: é竹 <[email protected]> Authored: Tue Mar 28 12:36:03 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Thu Mar 30 09:25:17 2017 +0200 ---------------------------------------------------------------------- .../datastream/DataStreamOverAggregate.scala | 17 +- .../table/runtime/aggregate/AggregateUtil.scala | 34 ++- .../RangeClauseBoundedOverProcessFunction.scala | 221 +++++++++++++++++++ .../table/api/scala/stream/sql/SqlITCase.scala | 144 ++++++++++++ .../scala/stream/sql/WindowAggregateTest.scala | 55 +++++ 5 files changed, 455 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 01e5a9a..7b744f1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -139,11 +139,16 @@ class DataStreamOverAggregate( // bounded OVER window if (overWindow.isRows) { // ROWS clause bounded OVER window - createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, isRowTimeType = true) + createBoundedAndCurrentRowOverWindow( + inputDS, + isRangeClause = false, + isRowTimeType = true) } else { // RANGE clause bounded OVER window - throw new TableException( - "row-time OVER RANGE PRECEDING window is not supported yet.") + createBoundedAndCurrentRowOverWindow( + inputDS, + isRangeClause = true, + isRowTimeType = true) } } else { throw new TableException( @@ -195,8 +200,9 @@ class DataStreamOverAggregate( result } - def createRowsClauseBoundedAndCurrentRowOverWindow( + def createBoundedAndCurrentRowOverWindow( inputDS: DataStream[Row], + isRangeClause: Boolean = false, isRowTimeType: Boolean = false): DataStream[Row] = { val overWindow: Group = logicWindow.groups.get(0) @@ -209,10 +215,11 @@ class DataStreamOverAggregate( // get the output types val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction( + val processFunction = AggregateUtil.createBoundedOverProcessFunction( namedAggregates, inputType, precedingOffset, + isRangeClause, isRowTimeType ) val result: DataStream[Row] = http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index fdac692..cbb2e53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -91,20 +91,21 @@ object AggregateUtil { } /** - * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for * bounded OVER window to evaluate final aggregate value. * * @param namedAggregates List of calls to aggregate functions and their output field names * @param inputType Input row type - * @param inputFields All input fields * @param precedingOffset the preceding offset + * @param isRangeClause It is a tag that indicates whether the OVER clause is rangeClause * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ - private[flink] def createRowsClauseBoundedOverProcessFunction( + private[flink] def createBoundedOverProcessFunction( namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, precedingOffset: Long, + isRangeClause: Boolean, isRowTimeType: Boolean): ProcessFunction[Row, Row] = { val (aggFields, aggregates) = @@ -117,14 +118,25 @@ object AggregateUtil { val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] if (isRowTimeType) { - new RowsClauseBoundedOverProcessFunction( - aggregates, - aggFields, - inputType.getFieldCount, - aggregationStateType, - inputRowType, - precedingOffset - ) + if (isRangeClause) { + new RangeClauseBoundedOverProcessFunction( + aggregates, + aggFields, + inputType.getFieldCount, + aggregationStateType, + inputRowType, + precedingOffset + ) + } else { + new RowsClauseBoundedOverProcessFunction( + aggregates, + aggFields, + inputType.getFieldCount, + aggregationStateType, + inputRowType, + precedingOffset + ) + } } else { throw TableException( "Bounded partitioned proc-time OVER aggregation is not supported yet.") http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala new file mode 100644 index 0000000..0c8555b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState[Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState[Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + // gets all window data from state for the calculation + val inputs: JList[Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList[Long] = new JArrayList[Long] + + // do retraction + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs: Long = dataTimestampIt.next() + val offset = timestamp - dataTs + if (offset > precedingOffset) { + val retractDataList = dataState.get(dataTs) + dataListIndex = 0 + while (dataListIndex < retractDataList.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex) + .getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + } + retractTsList.add(dataTs) + } + } + + // do accumulation + dataListIndex = 0 + while (dataListIndex < inputs.size()) { + // accumulate current row + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex) + .getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + } + + // set aggregate in output row + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val index = forwardedFieldCount + aggregatesIndex + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + output.setField(index, aggregates(aggregatesIndex).getValue(accumulator)) + aggregatesIndex += 1 + } + + // copy forwarded fields to output row and emit output row + dataListIndex = 0 + while (dataListIndex < inputs.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < forwardedFieldCount) { + output.setField(aggregatesIndex, inputs.get(dataListIndex).getField(aggregatesIndex)) + aggregatesIndex += 1 + } + out.collect(output) + dataListIndex += 1 + } + + // remove the data that has been retracted + dataListIndex = 0 + while (dataListIndex < retractTsList.size) { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + } + + // update state + accumulatorState.update(accumulators) + lastTriggeringTsState.update(timestamp) + } + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 80ff42a..b8285a1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -411,6 +411,150 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 6, "Hello"))), + Left((6500L, (6L, 65, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((9500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 9, "Hello"))), + Right(10000L), + Left((10000L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env + .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, b, " + + "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "preceding AND CURRENT ROW)" + + ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + " preceding AND CURRENT ROW)" + + " from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", + "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9", + "Hello,3,4,9", + "Hello,4,2,7", + "Hello,5,2,9", + "Hello,6,2,11","Hello,65,2,12", + "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18", + "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7", + "Hello World,8,2,15", + "Hello World,20,1,20") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 6, "Hello"))), + Left((6500L, (6L, 65, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((9500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 9, "Hello"))), + Right(10000L), + Left((10000L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env + .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, b, " + + "count(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "preceding AND CURRENT ROW)" + + ", sum(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + " preceding AND CURRENT ROW)" + + " from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", + "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9", + "Hello,3,4,9", + "Hello,4,2,7", + "Hello,5,2,9", + "Hello,6,2,11","Hello,65,2,12", + "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18", + "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7", + "Hello World,8,2,15", + "Hello World,20,1,20") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + /** * All aggregates must be computed on the same window. */ http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 7b8b2df..45d204a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testBoundPartitionedRowTimeWindowWithRange() = { + val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY RowTime() " + + "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + + "from MyTable" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS $1") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundNonPartitionedRowTimeWindowWithRange() = { + val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY RowTime() " + + "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + + "from MyTable" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS $1") + ) + streamUtil.verifySql(sql, expected) + } + }
