http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala index 910cbf2..9da2c44 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala @@ -18,11 +18,12 @@ package org.apache.flink.table.api.scala.stream.table +import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink import org.apache.flink.types.Row import org.junit.Assert.assertEquals @@ -34,6 +35,8 @@ import scala.collection.mutable * Tests of groupby (without window) aggregations */ class GroupAggregationsITCase extends StreamingWithStateTestBase { + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) @Test def testNonKeyedGroupAggregate(): Unit = { @@ -45,7 +48,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('a.sum, 'b.sum) - val results = t.toRetractStream[Row] + val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() @@ -64,7 +67,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { .groupBy('b) .select('b, 'a.sum) - val results = t.toRetractStream[Row] + val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink) env.execute() @@ -85,7 +88,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { .groupBy('cnt) .select('cnt, 'b.count as 'freq) - val results = t.toRetractStream[Row] + val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute() @@ -104,7 +107,7 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { .groupBy('e, 'b % 3) .select('c.min, 'e, 'a.avg, 'd.count) - val results = t.toRetractStream[Row] + val results = t.toRetractStream[Row](queryConfig) results.addSink(new RetractingSink) env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala deleted file mode 100644 index eadcfc8..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.util.Comparator -import java.util.concurrent.ConcurrentLinkedQueue -import java.lang.{Integer => JInt, Long => JLong} - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.operators.KeyedProcessOperator -import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord -import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} -import org.apache.flink.table.codegen.GeneratedAggregationsFunction -import org.apache.flink.table.functions.AggregateFunction -import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} -import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._ -import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.types.Row -import org.junit.Test - -class BoundedProcessingOverRangeProcessFunctionTest { - - @Test - def testProcTimePartitionedOverRange(): Unit = { - - val rT = new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]]( - INT_TYPE_INFO, - LONG_TYPE_INFO, - INT_TYPE_INFO, - STRING_TYPE_INFO, - LONG_TYPE_INFO), - Array("a", "b", "c", "d", "e"))) - - val aggregates = - Array(new LongMinWithRetractAggFunction, - new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] - val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates) - - val funcCode = - """ - |public class BoundedOverAggregateHelper$33 - | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { - | - | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction - | fmin = null; - | - | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction - | fmax = null; - | - | public BoundedOverAggregateHelper$33() throws Exception { - | - | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils - | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + - | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" + - | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + - | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + - | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); - | - | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils - | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + - | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" + - | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + - | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + - | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); - | } - | - | public void setAggregationResults( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row output) { - | - | org.apache.flink.table.functions.AggregateFunction baseClass0 = - | (org.apache.flink.table.functions.AggregateFunction) fmin; - | output.setField(5, baseClass0.getValue( - | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0))); - | - | org.apache.flink.table.functions.AggregateFunction baseClass1 = - | (org.apache.flink.table.functions.AggregateFunction) fmax; - | output.setField(6, baseClass1.getValue( - | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1))); - | } - | - | public void accumulate( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input) { - | - | fmin.accumulate( - | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0)), - | (java.lang.Long) input.getField(4)); - | - | fmax.accumulate( - | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1)), - | (java.lang.Long) input.getField(4)); - | } - | - | public void retract( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input) { - | - | fmin.retract( - | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0)), - | (java.lang.Long) input.getField(4)); - | - | fmax.retract( - | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1)), - | (java.lang.Long) input.getField(4)); - | } - | - | public org.apache.flink.types.Row createAccumulators() { - | - | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2); - | - | accs.setField( - | 0, - | fmin.createAccumulator()); - | - | accs.setField( - | 1, - | fmax.createAccumulator()); - | - | return accs; - | } - | - | public void setForwardedFields( - | org.apache.flink.types.Row input, - | org.apache.flink.types.Row output) { - | - | output.setField(0, input.getField(0)); - | output.setField(1, input.getField(1)); - | output.setField(2, input.getField(2)); - | output.setField(3, input.getField(3)); - | output.setField(4, input.getField(4)); - | } - | - | public org.apache.flink.types.Row createOutputRow() { - | return new org.apache.flink.types.Row(7); - | } - | - |/******* This test does not use the following methods *******/ - | public org.apache.flink.types.Row mergeAccumulatorsPair( - | org.apache.flink.types.Row a, - | org.apache.flink.types.Row b) { - | return null; - | } - | - | public void resetAccumulator(org.apache.flink.types.Row accs) { - | } - | - | public void setConstantFlags(org.apache.flink.types.Row output) { - | } - |} - """.stripMargin - - val funcName = "BoundedOverAggregateHelper$33" - - val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode) - val processFunction = new KeyedProcessOperator[String, CRow, CRow]( - new ProcTimeBoundedRangeOver( - genAggFunction, - 1000, - aggregationStateType, - rT)) - - val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, CRow]( - processFunction, - new TupleRowSelector(0), - BasicTypeInfo.INT_TYPE_INFO) - - testHarness.open() - - // Time = 3 - testHarness.setProcessingTime(3) - // key = 1 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) - // key = 2 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) - - // Time = 4 - testHarness.setProcessingTime(4) - // key = 1 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) - // key = 2 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) - - // Time = 5 - testHarness.setProcessingTime(5) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) - - // Time = 6 - testHarness.setProcessingTime(6) - - // Time = 1002 - testHarness.setProcessingTime(1002) - // key = 1 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) - // key = 2 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) - - // Time = 1003 - testHarness.setProcessingTime(1003) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) - - // Time = 1004 - testHarness.setProcessingTime(1004) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) - - // Time = 1005 - testHarness.setProcessingTime(1005) - // key = 1 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0)) - testHarness.processElement(new StreamRecord( - new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0)) - // key = 2 - testHarness.processElement(new StreamRecord( - new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0)) - - testHarness.setProcessingTime(1006) - - val result = testHarness.getOutput - - val expectedOutput = new ConcurrentLinkedQueue[Object]() - - // all elements at the same proc timestamp have the same value - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006)) - expectedOutput.add(new StreamRecord(new CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006)) - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", - expectedOutput, result, new RowResultSortComparator(6)) - - testHarness.close() - - } -} - -object BoundedProcessingOverRangeProcessFunctionTest { - -/** - * Return 0 for equal CRows and non zero for different CRows - */ -class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { - - override def compare(o1: Object, o2: Object):Int = { - - if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) { - // watermark is not expected - -1 - } else { - val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue - val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue - row1.toString.compareTo(row2.toString) - } - } -} - -/** - * Simple test class that returns a specified field as the selector function - */ -class TupleRowSelector( - private val selectorField:Int) extends KeySelector[CRow, Integer] { - - override def getKey(value: CRow): Integer = { - value.row.getField(selectorField).asInstanceOf[Integer] - } -} - -} http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala index eb5acd5b..77798f9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -19,15 +19,294 @@ package org.apache.flink.table.runtime.harness import java.util.{Comparator, Queue => JQueue} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} -import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.codegen.GeneratedAggregationsFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction, IntSumWithRetractAggFunction} +import org.apache.flink.table.runtime.aggregate.AggregateUtil +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} class HarnessTestBase { + + protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a", "b", "c", "d", "e")) + + protected val SumRowType = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO), + Array("a", "b", "c")) + + protected val minMaxCRowType = new CRowTypeInfo(MinMaxRowType) + protected val sumCRowType = new CRowTypeInfo(SumRowType) + + protected val minMaxAggregates = + Array(new LongMinWithRetractAggFunction, + new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] + + protected val sumAggregates = + Array(new IntSumWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] + + protected val minMaxAggregationStateType: RowTypeInfo = + AggregateUtil.createAccumulatorRowType(minMaxAggregates) + + protected val sumAggregationStateType: RowTypeInfo = + AggregateUtil.createAccumulatorRowType(sumAggregates) + + val minMaxCode: String = + """ + |public class MinMaxAggregateHelper + | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { + | + | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction + | fmin = null; + | + | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction + | fmax = null; + | + | public MinMaxAggregateHelper() throws Exception { + | + | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + + | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" + + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); + | + | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + + | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" + + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); + | } + | + | public void setAggregationResults( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row output) { + | + | org.apache.flink.table.functions.AggregateFunction baseClass0 = + | (org.apache.flink.table.functions.AggregateFunction) fmin; + | output.setField(5, baseClass0.getValue( + | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0))); + | + | org.apache.flink.table.functions.AggregateFunction baseClass1 = + | (org.apache.flink.table.functions.AggregateFunction) fmax; + | output.setField(6, baseClass1.getValue( + | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1))); + | } + | + | public void accumulate( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | + | fmin.accumulate( + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), + | (java.lang.Long) input.getField(4)); + | + | fmax.accumulate( + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), + | (java.lang.Long) input.getField(4)); + | } + | + | public void retract( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | + | fmin.retract( + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), + | (java.lang.Long) input.getField(4)); + | + | fmax.retract( + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), + | (java.lang.Long) input.getField(4)); + | } + | + | public org.apache.flink.types.Row createAccumulators() { + | + | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2); + | + | accs.setField( + | 0, + | fmin.createAccumulator()); + | + | accs.setField( + | 1, + | fmax.createAccumulator()); + | + | return accs; + | } + | + | public void setForwardedFields( + | org.apache.flink.types.Row input, + | org.apache.flink.types.Row output) { + | + | output.setField(0, input.getField(0)); + | output.setField(1, input.getField(1)); + | output.setField(2, input.getField(2)); + | output.setField(3, input.getField(3)); + | output.setField(4, input.getField(4)); + | } + | + | public org.apache.flink.types.Row createOutputRow() { + | return new org.apache.flink.types.Row(7); + | } + | + |/******* This test does not use the following methods *******/ + | public org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) { + | return null; + | } + | + | public void resetAccumulator(org.apache.flink.types.Row accs) { + | } + | + | public void setConstantFlags(org.apache.flink.types.Row output) { + | } + |} + """.stripMargin + + val sumAggCode: String = + """ + |public final class SumAggregationHelper + | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { + | + | + |transient org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction + |sum = null; + |private final org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache + | .flink.table.functions.aggfunctions.SumWithRetractAccumulator> accIt0 = + | new org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache.flink + | .table + | .functions.aggfunctions.SumWithRetractAccumulator>(); + | + | public SumAggregationHelper() throws Exception { + | + |sum = (org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction) + |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + |.deserialize + |("rO0ABXNyAEpvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuSW50U3VtV2l0a" + + |"FJldHJhY3RBZ2dGdW5jdGlvblkfWkeNZDeDAgAAeHIAR29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25" + + |"zLmFnZ2Z1bmN0aW9ucy5TdW1XaXRoUmV0cmFjdEFnZ0Z1bmN0aW9ut2oWrOsLrs0CAAFMAAdudW1lcmljdAAUT" + + |"HNjYWxhL21hdGgvTnVtZXJpYzt4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXR" + + |"lRnVuY3Rpb25NxhU-0mM1_AIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVma" + + |"W5lZEZ1bmN0aW9uLQH3VDG4DJMCAAB4cHNyACFzY2FsYS5tYXRoLk51bWVyaWMkSW50SXNJbnRlZ3JhbCTw6XA" + + |"59sPAzAIAAHhw"); + | + | + | } + | + | public final void setAggregationResults( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row output) { + | + | org.apache.flink.table.functions.AggregateFunction baseClass0 = + | (org.apache.flink.table.functions.AggregateFunction) + | sum; + | + | output.setField( + | 1, + | baseClass0.getValue((org.apache.flink.table.functions.aggfunctions + | .SumWithRetractAccumulator) accs.getField(0))); + | } + | + | public final void accumulate( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | + | sum.accumulate( + | ((org.apache.flink.table.functions.aggfunctions.SumWithRetractAccumulator) accs + | .getField + | (0)), + | (java.lang.Integer) input.getField(1)); + | } + | + | + | public final void retract( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | } + | + | public final org.apache.flink.types.Row createAccumulators() + | { + | + | org.apache.flink.types.Row accs = + | new org.apache.flink.types.Row(1); + | + | accs.setField( + | 0, + | sum.createAccumulator()); + | + | return accs; + | } + | + | public final void setForwardedFields( + | org.apache.flink.types.Row input, + | org.apache.flink.types.Row output) + | { + | + | output.setField( + | 0, + | input.getField(0)); + | } + | + | public final void setConstantFlags(org.apache.flink.types.Row output) + | { + | + | } + | + | public final org.apache.flink.types.Row createOutputRow() { + | return new org.apache.flink.types.Row(2); + | } + | + | + | public final org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + | { + | + | return a; + | + | } + | + | public final void resetAccumulator( + | org.apache.flink.types.Row accs) { + | } + |} + |""".stripMargin + + + protected val minMaxFuncName = "MinMaxAggregateHelper" + protected val sumFuncName = "SumAggregationHelper" + + protected val genMinMaxAggFunction = GeneratedAggregationsFunction(minMaxFuncName, minMaxCode) + protected val genSumAggFunction = GeneratedAggregationsFunction(sumFuncName, sumAggCode) + def createHarnessTester[IN, OUT, KEY]( operator: OneInputStreamOperator[IN, OUT], keySelector: KeySelector[IN, KEY], http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala new file mode 100644 index 0000000..04214f9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt, Long => JLong} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.harness.HarnessTestBase._ +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.junit.Test + +class NonWindowHarnessTest extends HarnessTestBase { + + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testProcTimeNonWindow(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new GroupAggProcessFunction( + genSumAggFunction, + sumAggregationStateType, + false, + queryConfig)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](2), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + // register cleanup timer with 3001 + testHarness.setProcessingTime(1) + + testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 1)) + // reuse timer 3001 + testHarness.setProcessingTime(1000) + testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "aaa"), true), 1)) + + // register cleanup timer with 4002 + testHarness.setProcessingTime(1002) + testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 1)) + + // trigger cleanup timer and register cleanup timer with 7003 + testHarness.setProcessingTime(4003) + testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 1)) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + } + + @Test + def testProcTimeNonWindowWithRetract(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new GroupAggProcessFunction( + genSumAggFunction, + sumAggregationStateType, + true, + queryConfig)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](2), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + // register cleanup timer with 3001 + testHarness.setProcessingTime(1) + + testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 2)) + testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 3)) + testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "ccc"), true), 4)) + + // trigger cleanup timer and register cleanup timer with 6002 + testHarness.setProcessingTime(3002) + testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 5)) + testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 6)) + testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 7)) + testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "eee"), true), 8)) + testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 9)) + testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 10)) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 2)) + expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 1: JInt), false), 3)) + expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 3)) + expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt), true), 4)) + expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt), true), 5)) + expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt), true), 6)) + expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 4: JInt), false), 7)) + expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 9: JInt), true), 7)) + expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt), true), 8)) + expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 9: JInt), false), 9)) + expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 16: JInt), true), 9)) + expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), false), 10)) + expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), true), 10)) + + verify(expectedOutput, result, new RowResultSortComparator(0)) + + testHarness.close() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala index 56ca85c..786a843 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala @@ -18,180 +18,34 @@ package org.apache.flink.table.runtime.harness import java.lang.{Integer => JInt, Long => JLong} -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.operators.KeyedProcessOperator import org.apache.flink.streaming.runtime.streamrecord.StreamRecord -import org.apache.flink.table.codegen.GeneratedAggregationsFunction -import org.apache.flink.table.functions.AggregateFunction -import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.harness.HarnessTestBase._ -import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.junit.Test class OverWindowHarnessTest extends HarnessTestBase{ - private val rT = new RowTypeInfo(Array[TypeInformation[_]]( - INT_TYPE_INFO, - LONG_TYPE_INFO, - INT_TYPE_INFO, - STRING_TYPE_INFO, - LONG_TYPE_INFO), - Array("a", "b", "c", "d", "e")) - - private val cRT = new CRowTypeInfo(rT) - - private val aggregates = - Array(new LongMinWithRetractAggFunction, - new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] - private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates) - - val funcCode: String = - """ - |public class BoundedOverAggregateHelper - | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { - | - | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction - | fmin = null; - | - | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction - | fmax = null; - | - | public BoundedOverAggregateHelper() throws Exception { - | - | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils - | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + - | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" + - | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + - | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + - | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); - | - | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils - | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + - | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + - | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" + - | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + - | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + - | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + - | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); - | } - | - | public void setAggregationResults( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row output) { - | - | org.apache.flink.table.functions.AggregateFunction baseClass0 = - | (org.apache.flink.table.functions.AggregateFunction) fmin; - | output.setField(5, baseClass0.getValue( - | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0))); - | - | org.apache.flink.table.functions.AggregateFunction baseClass1 = - | (org.apache.flink.table.functions.AggregateFunction) fmax; - | output.setField(6, baseClass1.getValue( - | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1))); - | } - | - | public void accumulate( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input) { - | - | fmin.accumulate( - | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0)), - | (java.lang.Long) input.getField(4)); - | - | fmax.accumulate( - | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1)), - | (java.lang.Long) input.getField(4)); - | } - | - | public void retract( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input) { - | - | fmin.retract( - | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) - | accs.getField(0)), - | (java.lang.Long) input.getField(4)); - | - | fmax.retract( - | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) - | accs.getField(1)), - | (java.lang.Long) input.getField(4)); - | } - | - | public org.apache.flink.types.Row createAccumulators() { - | - | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2); - | - | accs.setField( - | 0, - | fmin.createAccumulator()); - | - | accs.setField( - | 1, - | fmax.createAccumulator()); - | - | return accs; - | } - | - | public void setForwardedFields( - | org.apache.flink.types.Row input, - | org.apache.flink.types.Row output) { - | - | output.setField(0, input.getField(0)); - | output.setField(1, input.getField(1)); - | output.setField(2, input.getField(2)); - | output.setField(3, input.getField(3)); - | output.setField(4, input.getField(4)); - | } - | - | public org.apache.flink.types.Row createOutputRow() { - | return new org.apache.flink.types.Row(7); - | } - | - |/******* This test does not use the following methods *******/ - | public org.apache.flink.types.Row mergeAccumulatorsPair( - | org.apache.flink.types.Row a, - | org.apache.flink.types.Row b) { - | return null; - | } - | - | public void resetAccumulator(org.apache.flink.types.Row accs) { - | } - | - | public void setConstantFlags(org.apache.flink.types.Row output) { - | } - |} - """.stripMargin - - - private val funcName = "BoundedOverAggregateHelper" - - private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode) - + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) @Test def testProcTimeBoundedRowsOver(): Unit = { val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new ProcTimeBoundedRowsOver( - genAggFunction, + genMinMaxAggFunction, 2, - aggregationStateType, - cRT)) + minMaxAggregationStateType, + minMaxCRowType, + queryConfig)) val testHarness = createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo @@ -199,6 +53,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.open() + // register cleanup timer with 3001 testHarness.setProcessingTime(1) testHarness.processElement(new StreamRecord( @@ -209,6 +64,9 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1)) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1)) + + // register cleanup timer with 4100 + testHarness.setProcessingTime(1100) testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1)) testHarness.processElement(new StreamRecord( @@ -220,15 +78,19 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1)) - testHarness.setProcessingTime(2) + // register cleanup timer with 6001 + testHarness.setProcessingTime(3001) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2)) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2)) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2)) + + // trigger cleanup timer and register cleanup timer with 9002 + testHarness.setProcessingTime(6002) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2)) + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2)) testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2)) @@ -274,10 +136,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true), 2)) expectedOutput.add(new StreamRecord( CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2)) + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 2)) verify(expectedOutput, result, new RowResultSortComparator(6)) @@ -292,10 +154,11 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new ProcTimeBoundedRangeOver( - genAggFunction, - 1000, - aggregationStateType, - cRT)) + genMinMaxAggFunction, + 4000, + minMaxAggregationStateType, + minMaxCRowType, + queryConfig)) val testHarness = createHarnessTester( @@ -305,6 +168,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.open() + // register cleanup timer with 3003 testHarness.setProcessingTime(3) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) @@ -314,6 +178,9 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(4) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) + + // trigger cleanup timer and register cleanup timer with 6003 + testHarness.setProcessingTime(3003) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) testHarness.processElement(new StreamRecord( @@ -323,9 +190,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) - testHarness.setProcessingTime(6) + // register cleanup timer with 9002 + testHarness.setProcessingTime(6002) - testHarness.setProcessingTime(1002) + testHarness.setProcessingTime(7002) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) testHarness.processElement(new StreamRecord( @@ -333,15 +201,15 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) - testHarness.setProcessingTime(1003) + // register cleanup timer with 14002 + testHarness.setProcessingTime(11002) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) - testHarness.setProcessingTime(1004) + testHarness.setProcessingTime(11004) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) - testHarness.setProcessingTime(1005) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0)) testHarness.processElement(new StreamRecord( @@ -349,7 +217,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0)) - testHarness.setProcessingTime(1006) + testHarness.setProcessingTime(11006) val result = testHarness.getOutput @@ -364,40 +232,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 5)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true), 3004)) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5)) + CRow(Row.of( + 2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true), 3004)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true), 6)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true), 7003)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 7003)) expectedOutput.add(new StreamRecord( CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003)) + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true), 7003)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true), 11003)) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005)) + CRow(Row.of( + 1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true), 11005)) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + CRow(Row.of( + 1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true), 11005)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true), 11005)) expectedOutput.add(new StreamRecord( CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006)) + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 11005)) verify(expectedOutput, result, new RowResultSortComparator(6)) @@ -409,8 +277,9 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new ProcTimeUnboundedPartitionedOver( - genAggFunction, - aggregationStateType)) + genMinMaxAggFunction, + minMaxAggregationStateType, + queryConfig)) val testHarness = createHarnessTester( @@ -420,6 +289,9 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.open() + // register cleanup timer with 4003 + testHarness.setProcessingTime(1003) + testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) testHarness.processElement(new StreamRecord( @@ -438,18 +310,19 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) - - testHarness.setProcessingTime(1003) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003)) + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003)) + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) + + // trigger cleanup timer and register cleanup timer with 8003 + testHarness.setProcessingTime(5003) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003)) + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 5003)) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003)) + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 5003)) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003)) + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 5003)) val result = testHarness.getOutput @@ -484,19 +357,19 @@ class OverWindowHarnessTest extends HarnessTestBase{ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 0)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 0)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true), 5003)) expectedOutput.add(new StreamRecord( CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 5003)) expectedOutput.add(new StreamRecord( CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003)) + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 5003)) verify(expectedOutput, result, new RowResultSortComparator(6)) testHarness.close() @@ -510,10 +383,11 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new RowTimeBoundedRangeOver( - genAggFunction, - aggregationStateType, - cRT, - 4000)) + genMinMaxAggFunction, + minMaxAggregationStateType, + minMaxCRowType, + 4000, + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( @@ -573,6 +447,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(19000) + // test cleanup + testHarness.setProcessingTime(1000) + testHarness.processWatermark(20000) + + // check that state is removed after max retention time + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000 + testHarness.setProcessingTime(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500 + testHarness.processWatermark(20010) // compute output + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(4499) + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(4500) + assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone + + // check that state is only removed if all data was processed + testHarness.processElement(new StreamRecord( + CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500 + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + + testHarness.processWatermark(20020) // schedule emission + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(8499) // clean-up + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(8500) // clean-up + assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone + val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -621,6 +529,16 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow( Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011)) + verify(expectedOutput, result, new RowResultSortComparator(6)) testHarness.close() } @@ -630,10 +548,11 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new RowTimeBoundedRowsOver( - genAggFunction, - aggregationStateType, - cRT, - 3)) + genMinMaxAggFunction, + minMaxAggregationStateType, + minMaxCRowType, + 3, + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( @@ -689,6 +608,41 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(19000) + // test cleanup + testHarness.setProcessingTime(1000) + testHarness.processWatermark(20000) + + // check that state is removed after max retention time + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000 + testHarness.setProcessingTime(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500 + testHarness.processWatermark(20010) // compute output + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(4499) + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(4500) + assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone + + // check that state is only removed if all data was processed + testHarness.processElement(new StreamRecord( + CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500 + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + + testHarness.processWatermark(20020) // schedule emission + + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(8499) // clean-up + assert(testHarness.numKeyedStateEntries() > 0) // check that we have state + testHarness.setProcessingTime(8500) // clean-up + assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone + + val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -736,6 +690,16 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow( Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011)) + verify(expectedOutput, result, new RowResultSortComparator(6)) testHarness.close() } @@ -748,9 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new RowTimeUnboundedRangeOver( - genAggFunction, - aggregationStateType, - cRT)) + genMinMaxAggFunction, + minMaxAggregationStateType, + minMaxCRowType, + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( @@ -760,6 +725,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.open() + testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) @@ -806,6 +772,30 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(19000) + // test cleanup + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(3000) // clean up is triggered + assert(testHarness.numKeyedStateEntries() == 0) + + testHarness.processWatermark(20000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + testHarness.setProcessingTime(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000 + + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 + testHarness.processWatermark(20010) // compute output + + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(7000) // clean up is triggered + assert(testHarness.numKeyedStateEntries() == 0) + val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -854,6 +844,13 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow( Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + verify(expectedOutput, result, new RowResultSortComparator(6)) testHarness.close() } @@ -863,9 +860,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new RowTimeUnboundedRowsOver( - genAggFunction, - aggregationStateType, - cRT)) + genMinMaxAggFunction, + minMaxAggregationStateType, + minMaxCRowType, + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( @@ -875,6 +873,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.open() + testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) @@ -921,6 +920,30 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(19000) + // test cleanup + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(3000) // clean up is triggered + assert(testHarness.numKeyedStateEntries() == 0) + + testHarness.processWatermark(20000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + testHarness.setProcessingTime(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000 + + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 + testHarness.processWatermark(20010) // compute output + + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen + assert(testHarness.numKeyedStateEntries() > 0) + testHarness.setProcessingTime(7000) // clean up is triggered + assert(testHarness.numKeyedStateEntries() == 0) + val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -968,6 +991,13 @@ class OverWindowHarnessTest extends HarnessTestBase{ CRow( Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + verify(expectedOutput, result, new RowResultSortComparator(6)) testHarness.close() } http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 3d79e22..c4e2433 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -28,7 +28,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override private[flink] def writeToSink[T]( table: Table, sink: TableSink[T], - qConfig: QueryConfig): Unit = ??? + queryConfig: QueryConfig): Unit = ??? override protected def checkValidTableName(name: String): Unit = ???