KurtYoung commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r441942432



##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link RowTimeRangeBoundedPrecedingFunction}.
+ */
+public class RowTimeRangeBoundedPrecedingFunctionTest {
+
+       private static GeneratedAggsHandleFunction aggsHandleFunction =
+               new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+                       @Override
+                       public AggsHandleFunction newInstance(ClassLoader 
classLoader) {
+                               return new SumAggsHandleFunction(1);
+                       }
+               };
+
+       private LogicalType[] inputFieldTypes = new LogicalType[]{
+               new VarCharType(VarCharType.MAX_LENGTH),
+               new BigIntType(),
+               new BigIntType()
+       };
+       private LogicalType[] accTypes = new LogicalType[]{ new BigIntType() };
+
+       private BinaryRowDataKeySelector keySelector = new 
BinaryRowDataKeySelector(new int[]{ 0 }, inputFieldTypes);
+       private TypeInformation<RowData> keyType = 
keySelector.getProducedType();
+
+       @Test
+       public void testRecordRetraction() throws Exception {
+               RowTimeRangeBoundedPrecedingFunction<RowData> function = new 
RowTimeRangeBoundedPrecedingFunction<>(0, 0, aggsHandleFunction, accTypes, 
inputFieldTypes, 2000, 2);
+               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(function);
+
+               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(operator);
+
+               testHarness.open();
+
+               HeapKeyedStateBackend stateBackend = (HeapKeyedStateBackend) 
operator.getKeyedStateBackend();

Review comment:
       use `AbstractKeyedStateBackend` instead

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunctionTest.java
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.table.runtime.operators.over;

Review comment:
       missing header

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.table.runtime.operators.over;

Review comment:
       missing header

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java
##########
@@ -0,0 +1,76 @@
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link ProcTimeRangeBoundedPrecedingFunction}.
+ */
+public class ProcTimeRangeBoundedPrecedingFunctionTest {
+
+       private static GeneratedAggsHandleFunction aggsHandleFunction =
+               new GeneratedAggsHandleFunction("Function", "", new Object[0]) {
+                       @Override
+                       public AggsHandleFunction newInstance(ClassLoader 
classLoader) {
+                               return new SumAggsHandleFunction(1);
+                       }
+               };
+
+       private LogicalType[] inputFieldTypes = new LogicalType[]{
+               new VarCharType(VarCharType.MAX_LENGTH),
+               new BigIntType(),
+       };
+       private LogicalType[] accTypes = new LogicalType[]{ new BigIntType() };
+
+       private BinaryRowDataKeySelector keySelector = new 
BinaryRowDataKeySelector(new int[]{ 0 }, inputFieldTypes);
+       private TypeInformation<RowData> keyType = 
keySelector.getProducedType();
+
+       @Test
+       public void testRecordRetraction() throws Exception {
+               ProcTimeRangeBoundedPrecedingFunction<RowData> function = new 
ProcTimeRangeBoundedPrecedingFunction<>(0, 0, aggsHandleFunction, accTypes, 
inputFieldTypes, 2000);
+               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(function);
+
+               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(operator);
+
+               testHarness.open();
+
+               HeapKeyedStateBackend stateBackend = (HeapKeyedStateBackend) 
operator.getKeyedStateBackend();

Review comment:
       Use `AbstractKeyedStateBackend` here so that it will still work if the 
type of state backend changes to rocksdb.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to