[FLINK-6583] [table] Add state cleanup for counting GroupWindows. This closes #3919.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d85d9693 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d85d9693 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d85d9693 Branch: refs/heads/master Commit: d85d969334e89d83aec60f9bb3d2c69a4701eb54 Parents: 64d3ce8 Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Tue May 16 11:58:37 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed May 17 15:24:23 2017 +0200 ---------------------------------------------------------------------- .../DataStreamGroupWindowAggregate.scala | 32 +++- .../triggers/StateCleaningCountTrigger.scala | 136 +++++++++++++++++ .../table/GroupWindowAggregationsITCase.scala | 10 +- .../StateCleaningCountTriggerHarnessTest.scala | 147 +++++++++++++++++++ 4 files changed, 316 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c158579..1ac013a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger +import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory @@ -40,7 +41,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger +import org.slf4j.LoggerFactory class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate( grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) + override def deriveRowType(): RelDataType = schema.logicalType override def needsUpdatesAsRetraction = true @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } + val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + } + + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( + "No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent excessive " + + "state size. You may specify a retention time of 0 to not clean up the state.") + } + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) val aggString = aggregationToString( @@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = - createKeyedWindowedStream(window, keyedStream) + createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate( physicalNamedProperties) val windowedStream = - createNonKeyedWindowedStream(window, inputDS) + createNonKeyedWindowedStream(queryConfig, window, inputDS) .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -215,6 +232,7 @@ class DataStreamGroupWindowAggregate( object DataStreamGroupWindowAggregate { private def createKeyedWindowedStream( + queryConfig: StreamQueryConfig, groupWindow: LogicalWindow, stream: KeyedStream[CRow, Tuple]): WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match { @@ -226,6 +244,7 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindow(toLong(size)) + .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size)))); case TumblingGroupWindow(_, timeField, size) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) => @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -267,6 +287,7 @@ object DataStreamGroupWindowAggregate { } private def createNonKeyedWindowedStream( + queryConfig: StreamQueryConfig, groupWindow: LogicalWindow, stream: DataStream[CRow]): AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match { @@ -278,6 +299,7 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size)) + .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size)))); case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => stream.windowAll(TumblingEventTimeWindows.of(toTime(size))) @@ -296,6 +318,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -317,6 +340,5 @@ object DataStreamGroupWindowAggregate { stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap))) } - } http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala new file mode 100644 index 0000000..f3f9246 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum + +/** + * A [[Trigger]] that fires once the count of elements in a pane reaches the given count + * or the cleanup timer is triggered. + */ +class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger[Any, GlobalWindow] { + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val stateDesc = + new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG) + + private val cleanupStateDesc = + new ValueStateDescriptor[JLong]("countCleanup", Types.LONG) + + override def canMerge = false + + override def toString: String = "CountTriggerGlobalWindowithCleanupState(" + + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " + + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " + + "maxCount=" + maxCount + ")" + + override def onElement( + element: Any, + timestamp: Long, + window: GlobalWindow, + ctx: TriggerContext): TriggerResult = { + + val currentTime = ctx.getCurrentProcessingTime + + // register cleanup timer + if (stateCleaningEnabled) { + // last registered timer + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + ctx.registerProcessingTimeTimer(cleanupTime) + + if (null != curCleanupTime) { + ctx.deleteProcessingTimeTimer(curCleanupTime) + } + + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) + } + } + + val count = ctx.getPartitionedState(stateDesc) + count.add(1L) + + if (count.get >= maxCount) { + count.clear() + TriggerResult.FIRE + } else { + TriggerResult.CONTINUE + } + } + + override def onProcessingTime( + time: Long, + window: GlobalWindow, + ctx: TriggerContext): TriggerResult = { + + if (stateCleaningEnabled) { + val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + // check that the triggered timer is the last registered processing time timer. + if (null != cleanupTime && time == cleanupTime) { + clear(window, ctx) + return TriggerResult.FIRE_AND_PURGE + } + } + TriggerResult.CONTINUE + } + + override def onEventTime(time: Long, window: GlobalWindow, ctx: TriggerContext): TriggerResult = { + TriggerResult.CONTINUE + } + + override def clear(window: GlobalWindow, ctx: TriggerContext): Unit = { + ctx.getPartitionedState(stateDesc).clear() + ctx.getPartitionedState(cleanupStateDesc).clear() + } + +} + +object StateCleaningCountTrigger { + + /** + * Create a [[StateCleaningCountTrigger]] instance. + * + * @param queryConfig query configuration. + * @param maxCount The count of elements at which to fire. + */ + def of(queryConfig: StreamQueryConfig, maxCount: Long): StateCleaningCountTrigger = + new StateCleaningCountTrigger(queryConfig, maxCount) + + class Sum extends ReduceFunction[JLong] { + override def reduce(value1: JLong, value2: JLong): JLong = value1 + value2 + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala index 846fe3e..81d3577 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -18,13 +18,14 @@ package org.apache.flink.table.api.scala.stream.table +import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset @@ -41,7 +42,8 @@ import scala.collection.mutable * programs is possible. */ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { - + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) val data = List( (1L, 1, "Hi"), (2L, 2, "Hello"), @@ -68,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toDataStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() @@ -136,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select(countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toDataStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala new file mode 100644 index 0000000..96601fb --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger +import org.junit.Assert.assertEquals +import org.junit.Test + +class StateCleaningCountTriggerHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFiringWithPurging(): Unit = { + val testHarness = new TriggerTestHarness[Any, GlobalWindow]( + StateCleaningCountTrigger.of(queryConfig, 10), new GlobalWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // counter of window() is cleared + assertEquals(1, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(GlobalWindow.get)) + + // try to trigger onProcessingTime method via 7002, and all states are cleared + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(7002).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + def testClear() { + val testHarness = new TriggerTestHarness[Any, GlobalWindow]( + StateCleaningCountTrigger.of(queryConfig, 3), + new GlobalWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + // have 1 timers + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) + + testHarness.clearTriggerState(GlobalWindow.get) + + assertEquals(0, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(GlobalWindow.get)) + } +}