This is an automated email from the ASF dual-hosted git repository.
jingzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d46548c [FLINK-25614][table-runtime] Update LocalWindowAggregate
chain strategy to ALWAYS
d46548c is described below
commit d46548c5024c4c47d6e52168e1d92c68d3ce58f8
Author: lmagic233 <[email protected]>
AuthorDate: Tue Jan 11 23:24:48 2022 +0800
[FLINK-25614][table-runtime] Update LocalWindowAggregate chain strategy to
ALWAYS
This closes #18331.
---
.../window/LocalSlicingWindowAggOperator.java | 2 +
.../operators/window/slicing/SliceAssigners.java | 9 +-
.../window/SlicingWindowAggOperatorTest.java | 114 +++++++++++----------
3 files changed, 66 insertions(+), 59 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index fbdfe48..1742767 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -21,6 +21,7 @@ package
org.apache.flink.table.runtime.operators.aggregate.window;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -80,6 +81,7 @@ public class LocalSlicingWindowAggOperator extends
AbstractStreamOperator<RowDat
SliceAssigner sliceAssigner,
WindowBuffer.LocalFactory windowBufferFactory,
ZoneId shiftTimezone) {
+ chainingStrategy = ChainingStrategy.ALWAYS;
this.keySelector = keySelector;
this.sliceAssigner = sliceAssigner;
this.windowInterval = sliceAssigner.getSliceEndInterval();
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
index 5b71b35..b568c48 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
@@ -20,6 +20,7 @@ package
org.apache.flink.table.runtime.operators.window.slicing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.util.IterableIterator;
import org.apache.flink.util.MathUtils;
@@ -401,7 +402,7 @@ public final class SliceAssigners {
@Override
public long assignSliceEnd(RowData element, ClockService clock) {
- return element.getLong(windowEndIndex);
+ return element.getTimestamp(windowEndIndex, 3).getMillisecond();
}
@Override
@@ -507,7 +508,7 @@ public final class SliceAssigners {
@Override
public long assignSliceEnd(RowData element, ClockService clock) {
- return element.getLong(sliceEndIndex);
+ return element.getTimestamp(sliceEndIndex, 3).getMillisecond();
}
@Override
@@ -552,7 +553,9 @@ public final class SliceAssigners {
public final long assignSliceEnd(RowData element, ClockService clock) {
final long timestamp;
if (rowtimeIndex >= 0) {
- timestamp = toUtcTimestampMills(element.getLong(rowtimeIndex),
shiftTimeZone);
+ // Precision for row timestamp is always 3
+ TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
+ timestamp = toUtcTimestampMills(rowTime.getMillisecond(),
shiftTimeZone);
} else {
// in processing time mode
timestamp = toUtcTimestampMills(clock.currentProcessingTime(),
shiftTimeZone);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index 1989701..58c384a 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
@@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.apache.flink.table.data.TimestampData.fromEpochMillis;
import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
import static org.junit.Assert.assertEquals;
@@ -82,7 +84,7 @@ public class SlicingWindowAggOperatorTest {
Arrays.asList(
new RowType.RowField("f0", new
VarCharType(Integer.MAX_VALUE)),
new RowType.RowField("f1", new IntType()),
- new RowType.RowField("f2", new BigIntType())));
+ new RowType.RowField("f2", new TimestampType())));
private static final RowDataSerializer INPUT_ROW_SER = new
RowDataSerializer(INPUT_ROW_TYPE);
@@ -138,16 +140,16 @@ public class SlicingWindowAggOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 3999L));
- testHarness.processElement(insertRecord("key2", 1, 3000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3000L)));
- testHarness.processElement(insertRecord("key1", 1, 20L));
- testHarness.processElement(insertRecord("key1", 1, 0L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(20L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(0L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(999L)));
- testHarness.processElement(insertRecord("key2", 1, 1998L));
- testHarness.processElement(insertRecord("key2", 1, 1999L));
- testHarness.processElement(insertRecord("key2", 1, 1000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1998L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1000L)));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-2000L),
localMills(1000L)));
@@ -189,7 +191,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late element for [1K, 4K), but should be accumulated into [2K, 5K),
[3K, 6K)
- testHarness.processElement(insertRecord("key2", 1, 3500L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3500L)));
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(2000L),
localMills(5000L)));
@@ -198,7 +200,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late for all assigned windows, should be dropped
- testHarness.processElement(insertRecord("key1", 1, 2999L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(2999L)));
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(3000L),
localMills(6000L)));
@@ -246,7 +248,7 @@ public class SlicingWindowAggOperatorTest {
// timestamp is ignored in processing time
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T00:00:00.003"));
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T01:00:00"));
@@ -261,8 +263,8 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T02:00:00"));
@@ -276,8 +278,8 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T03:00:00"));
@@ -299,9 +301,9 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T07:00:00"));
@@ -366,16 +368,16 @@ public class SlicingWindowAggOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 2999L));
- testHarness.processElement(insertRecord("key2", 1, 3000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(2999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3000L)));
- testHarness.processElement(insertRecord("key1", 1, 20L));
- testHarness.processElement(insertRecord("key1", 1, 0L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(20L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(0L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(999L)));
- testHarness.processElement(insertRecord("key2", 1, 1998L));
- testHarness.processElement(insertRecord("key2", 1, 1999L));
- testHarness.processElement(insertRecord("key2", 1, 1000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1998L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1000L)));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L),
localMills(1000L)));
@@ -404,7 +406,7 @@ public class SlicingWindowAggOperatorTest {
testHarness.open();
// the late event would not trigger window [0, 2000L) again even if
the job restore from
// savepoint
- testHarness.processElement(insertRecord("key2", 1, 1000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1000L)));
testHarness.processWatermark(new Watermark(1999));
expectedOutput.add(new Watermark(1999));
@@ -424,7 +426,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late element for [3K, 4K), but should be accumulated into [3K, 5K)
[3K, 6K)
- testHarness.processElement(insertRecord("key1", 2, 3500L));
+ testHarness.processElement(insertRecord("key1", 2,
fromEpochMillis(3500L)));
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L),
localMills(5000L)));
@@ -434,7 +436,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late for all assigned windows, should be dropped
- testHarness.processElement(insertRecord("key1", 1, 2999L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(2999L)));
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L),
localMills(6000L)));
@@ -483,7 +485,7 @@ public class SlicingWindowAggOperatorTest {
// timestamp is ignored in processing time
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T00:00:00.003"));
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T08:00:00"));
@@ -498,8 +500,8 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T16:00:00"));
@@ -513,8 +515,8 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-02T00:00:00"));
@@ -536,9 +538,9 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(Long.MAX_VALUE)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-03T08:00:00"));
@@ -616,16 +618,16 @@ public class SlicingWindowAggOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 3999L));
- testHarness.processElement(insertRecord("key2", 1, 3000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3000L)));
- testHarness.processElement(insertRecord("key1", 1, 20L));
- testHarness.processElement(insertRecord("key1", 1, 0L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(20L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(0L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(999L)));
- testHarness.processElement(insertRecord("key2", 1, 1998L));
- testHarness.processElement(insertRecord("key2", 1, 1999L));
- testHarness.processElement(insertRecord("key2", 1, 1000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1998L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1000L)));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
@@ -663,7 +665,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late element, should be dropped
- testHarness.processElement(insertRecord("key1", 1, 2500L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(2500L)));
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
@@ -671,7 +673,7 @@ public class SlicingWindowAggOperatorTest {
"Output was not correct.", expectedOutput,
testHarness.getOutput());
// late element, should be dropped
- testHarness.processElement(insertRecord("key2", 1, 2999L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(2999L)));
testHarness.processWatermark(new Watermark(5999));
expectedOutput.add(insertRecord("key2", 2L, 2L, localMills(3000L),
localMills(6000L)));
@@ -725,12 +727,12 @@ public class SlicingWindowAggOperatorTest {
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T00:00:00.003"));
// timestamp is ignored in processing time
- testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
- testHarness.processElement(insertRecord("key2", 1, 7000L));
- testHarness.processElement(insertRecord("key2", 1, 7000L));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(Long.MAX_VALUE)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(7000L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(7000L)));
- testHarness.processElement(insertRecord("key1", 1, 7000L));
- testHarness.processElement(insertRecord("key1", 1, 7000L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(7000L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(7000L)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T05:00:00"));
@@ -752,9 +754,9 @@ public class SlicingWindowAggOperatorTest {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput,
testHarness.getOutput());
- testHarness.processElement(insertRecord("key1", 1, 7000L));
- testHarness.processElement(insertRecord("key1", 1, 7000L));
- testHarness.processElement(insertRecord("key1", 1, 7000L));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(7000L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(7000L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(7000L)));
testHarness.setProcessingTime(epochMills(shiftTimeZone,
"1970-01-01T10:00:01"));