This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 29edd60f85a [FLINK-36831][table] Introduce AppendOnlyTopNFunction in Rank with Async State API 29edd60f85a is described below commit 29edd60f85a38a3286a8c6da2f6400affb04ce6f Author: Xuyang <xyzhong...@163.com> AuthorDate: Tue Jan 14 20:39:42 2025 +0800 [FLINK-36831][table] Introduce AppendOnlyTopNFunction in Rank with Async State API This closes #25723 --- .../plan/nodes/exec/stream/StreamExecRank.java | 38 ++-- .../planner/runtime/harness/RankHarnessTest.scala | 165 +++++++++++++++-- .../rank/AbstractSyncStateTopNFunction.java | 32 ++++ .../operators/rank/AbstractTopNFunction.java | 91 ++++++---- .../operators/rank/AppendOnlyTopNFunction.java | 122 +++---------- .../rank/async/AbstractAsyncStateTopNFunction.java | 63 ++++++- .../async/AsyncStateAppendOnlyTopNFunction.java | 199 +++++++++++++++++++++ .../operators/rank/utils/AppendOnlyTopNHelper.java | 171 ++++++++++++++++++ .../operators/rank/utils/FastTop1Helper.java | 8 +- .../operators/rank/AppendOnlyTopNFunctionTest.java | 38 ++-- .../operators/rank/FastTop1FunctionTest.java | 3 - .../operators/rank/TopNFunctionTestBase.java | 13 -- 12 files changed, 754 insertions(+), 189 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java index 469270f0c5f..f2a0dd40e40 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java @@ -60,6 +60,7 @@ import org.apache.flink.table.runtime.operators.rank.RankType; import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction; import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction; import org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction; +import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction; import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; @@ -283,17 +284,31 @@ public class StreamExecRank extends ExecNodeBase<RowData> cacheSize); } } else { - processFunction = - new AppendOnlyTopNFunction( - ttlConfig, - inputRowTypeInfo, - sortKeyComparator, - sortKeySelector, - rankType, - rankRange, - generateUpdateBefore, - outputRankNumber, - cacheSize); + if (isAsyncStateEnabled) { + processFunction = + new AsyncStateAppendOnlyTopNFunction( + ttlConfig, + inputRowTypeInfo, + sortKeyComparator, + sortKeySelector, + rankType, + rankRange, + generateUpdateBefore, + outputRankNumber, + cacheSize); + } else { + processFunction = + new AppendOnlyTopNFunction( + ttlConfig, + inputRowTypeInfo, + sortKeyComparator, + sortKeySelector, + rankType, + rankRange, + generateUpdateBefore, + outputRankNumber, + cacheSize); + } } } else if (rankStrategy instanceof RankProcessStrategy.UpdateFastStrategy) { if (RankUtil.isTop1(rankRange)) { @@ -344,7 +359,6 @@ public class StreamExecRank extends ExecNodeBase<RowData> outputRankNumber, cacheSize); } - // TODO Use UnaryUpdateTopNFunction after SortedMapState is merged } else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) { EqualiserCodeGenerator equaliserCodeGen = new EqualiserCodeGenerator( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala index 032eba0d47a..4766668af92 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableFunctio import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord +import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.apache.flink.types.RowKind._ @@ -468,7 +469,10 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) testHarness.close() } - def prepareTop1Tester(query: String, operatorNameIdentifier: String) + def prepareRankTester( + query: String, + operatorNameIdentifier: String, + operatorOutputLogicalTypes: Array[LogicalType]) : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], RowDataHarnessAssertor) = { val sourceDDL = s""" @@ -486,11 +490,7 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) val testHarness = createHarnessTester(t1.toRetractStream[Row], operatorNameIdentifier) - val assertor = new RowDataHarnessAssertor( - Array( - DataTypes.STRING().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType)) + val assertor = new RowDataHarnessAssertor(operatorOutputLogicalTypes) (testHarness, assertor) } @@ -500,7 +500,7 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) val query = """ - |SELECT a, b, rn + |SELECT a, b |FROM |( | SELECT a, b, @@ -510,7 +510,11 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) |WHERE rn <= 1 """.stripMargin val (testHarness, assertor) = - prepareTop1Tester(query, "Rank(strategy=[AppendFastStrategy") + prepareRankTester( + query, + "Rank(strategy=[AppendFastStrategy", + Array(DataTypes.STRING().getLogicalType, DataTypes.BIGINT().getLogicalType) + ) if (enableAsyncState) { assertThat(isAsyncStateOperator(testHarness)).isTrue @@ -541,7 +545,7 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) val query = """ - |SELECT a, b, rn + |SELECT a, b |FROM |( | SELECT a, b, @@ -553,7 +557,11 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) |WHERE rn <= 1 """.stripMargin val (testHarness, assertor) = - prepareTop1Tester(query, "Rank(strategy=[UpdateFastStrategy") + prepareRankTester( + query, + "Rank(strategy=[UpdateFastStrategy", + Array(DataTypes.STRING().getLogicalType, DataTypes.BIGINT().getLogicalType) + ) if (enableAsyncState) { assertThat(isAsyncStateOperator(testHarness)).isTrue @@ -580,6 +588,143 @@ class RankHarnessTest(mode: StateBackendMode, enableAsyncState: Boolean) testHarness.close() } + + @TestTemplate + def testAppendOnlyTopNWithRowNumber(): Unit = { + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) + val query = + """ + |SELECT a, b, rn + |FROM + |( + | SELECT a, b, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn + | FROM T + |) t1 + |WHERE rn <= 3 + """.stripMargin + val (testHarness, assertor) = + prepareRankTester( + query, + "Rank(strategy=[AppendFastStrategy", + Array( + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType) + ) + + if (enableAsyncState) { + assertThat(isAsyncStateOperator(testHarness)).isTrue + } else { + assertThat(isAsyncStateOperator(testHarness)).isFalse + } + + testHarness.open() + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // a,2 - top1 + testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong, 1L: JLong)) + + // a,2 - top1 + // a,1 - top2 + testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong, 2L: JLong)) + + // a,3 - top1 + // a,2 - top2 + // a,1 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong, 1L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong, 1L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 1L: JLong, 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 2L: JLong, 2L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong, 3L: JLong)) + + // a,3 - top1 + // a,2 - top2 + // a,1 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 0L: JLong)) + + // a,3 - top1 + // a,3 - top2 + // a,2 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong, 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong, 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 1L: JLong, 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 2L: JLong, 3L: JLong)) + + val result = dropWatermarks(testHarness.getOutput.toArray) + assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) + + testHarness.close() + } + + @TestTemplate + def testAppendOnlyTopNWithoutRowNumber(): Unit = { + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) + val query = + """ + |SELECT a, b + |FROM + |( + | SELECT a, b, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn + | FROM T + |) t1 + |WHERE rn <= 3 + """.stripMargin + val (testHarness, assertor) = + prepareRankTester( + query, + "Rank(strategy=[AppendFastStrategy", + Array(DataTypes.STRING().getLogicalType, DataTypes.BIGINT().getLogicalType) + ) + + if (enableAsyncState) { + assertThat(isAsyncStateOperator(testHarness)).isTrue + } else { + assertThat(isAsyncStateOperator(testHarness)).isFalse + } + + testHarness.open() + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // a,2 - top1 + testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong)) + + // a,2 - top1 + // a,1 - top2 + testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong)) + + // a,3 - top1 + // a,2 - top2 + // a,1 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 3L: JLong)) + + // a,3 - top1 + // a,2 - top2 + // a,1 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 0L: JLong)) + + // a,3 - top1 + // a,3 - top2 + // a,2 - top3 + testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(DELETE, "a", 1L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "a", 3L: JLong)) + + val result = dropWatermarks(testHarness.getOutput.toArray) + assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) + + testHarness.close() + } } object RankHarnessTest { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java index a2cc38ca69b..a42b4c1e7d6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java @@ -27,12 +27,17 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +import java.util.Objects; /** Base class for TopN Function with sync state api. */ public abstract class AbstractSyncStateTopNFunction extends AbstractTopNFunction { private ValueState<Long> rankEndState; + protected long rankEnd; + public AbstractSyncStateTopNFunction( StateTtlConfig ttlConfig, InternalTypeInfo<RowData> inputRowType, @@ -76,6 +81,7 @@ public abstract class AbstractSyncStateTopNFunction extends AbstractTopNFunction */ protected long initRankEnd(RowData row) throws Exception { if (isConstantRankEnd) { + rankEnd = Objects.requireNonNull(constantRankEnd); return rankEnd; } else { Long rankEndValue = rankEndState.value(); @@ -95,4 +101,30 @@ public abstract class AbstractSyncStateTopNFunction extends AbstractTopNFunction } } } + + // ====== utility methods that omit the specified rank end ====== + + protected boolean isInRankEnd(long rank) { + return rank <= rankEnd; + } + + protected boolean isInRankRange(long rank) { + return rank <= rankEnd && rank >= rankStart; + } + + protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) { + collectInsert(out, inputRow, rank, rankEnd); + } + + protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) { + collectDelete(out, inputRow, rank, rankEnd); + } + + protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) { + collectUpdateAfter(out, inputRow, rank, rankEnd); + } + + protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) { + collectUpdateBefore(out, inputRow, rank, rankEnd); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java index 9c0c62b058e..6be70356116 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java @@ -38,7 +38,10 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Comparator; +import java.util.Objects; import java.util.function.Function; /** @@ -65,20 +68,33 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, protected final StateTtlConfig ttlConfig; - // The util to compare two sortKey equals to each other. - private GeneratedRecordComparator generatedSortKeyComparator; - protected Comparator<RowData> sortKeyComparator; - private final boolean generateUpdateBefore; + protected final boolean outputRankNumber; + protected final InternalTypeInfo<RowData> inputRowType; + protected final KeySelector<RowData, RowData> sortKeySelector; - protected KeyContext keyContext; protected final boolean isConstantRankEnd; - private final long rankStart; + + protected final long rankStart; + + // constant rank end + // if rank end is variable, this var is null + @Nullable protected final Long constantRankEnd; + + // variable rank end index private final int rankEndIndex; - protected long rankEnd; + + // The util to compare two sortKey equals to each other. + private GeneratedRecordComparator generatedSortKeyComparator; + + protected Comparator<RowData> sortKeyComparator; + + protected KeyContext keyContext; + + // variable rank end fetcher protected transient Function<RowData, Long> rankEndFetcher; protected Counter invalidCounter; @@ -118,15 +134,14 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, ConstantRankRange constantRankRange = (ConstantRankRange) rankRange; isConstantRankEnd = true; rankStart = constantRankRange.getRankStart(); - rankEnd = constantRankRange.getRankEnd(); rankEndIndex = -1; + constantRankEnd = constantRankRange.getRankEnd(); } else if (rankRange instanceof VariableRankRange) { VariableRankRange variableRankRange = (VariableRankRange) rankRange; rankEndIndex = variableRankRange.getRankEndIndex(); isConstantRankEnd = false; rankStart = -1; - rankEnd = -1; - + constantRankEnd = null; } else { LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG); throw new UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG); @@ -180,7 +195,7 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, * @return default topN size */ protected long getDefaultTopNSize() { - return isConstantRankEnd ? rankEnd : DEFAULT_TOPN_SIZE; + return isConstantRankEnd ? Objects.requireNonNull(constantRankEnd) : DEFAULT_TOPN_SIZE; } /** @@ -213,8 +228,9 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, .<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize); } - protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) { - if (isInRankRange(rank)) { + protected void collectInsert( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + if (isInRankRange(rank, rankEnd)) { out.collect(createOutputRow(inputRow, rank, RowKind.INSERT)); } } @@ -224,8 +240,9 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, out.collect(inputRow); } - protected void collectDelete(Collector<RowData> out, RowData inputRow, long rank) { - if (isInRankRange(rank)) { + protected void collectDelete( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + if (isInRankRange(rank, rankEnd)) { out.collect(createOutputRow(inputRow, rank, RowKind.DELETE)); } } @@ -235,8 +252,9 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, out.collect(inputRow); } - protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) { - if (isInRankRange(rank)) { + protected void collectUpdateAfter( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + if (isInRankRange(rank, rankEnd)) { out.collect(createOutputRow(inputRow, rank, RowKind.UPDATE_AFTER)); } } @@ -246,8 +264,9 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, out.collect(inputRow); } - protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) { - if (generateUpdateBefore && isInRankRange(rank)) { + protected void collectUpdateBefore( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + if (generateUpdateBefore && isInRankRange(rank, rankEnd)) { out.collect(createOutputRow(inputRow, rank, RowKind.UPDATE_BEFORE)); } } @@ -259,11 +278,7 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, } } - protected boolean isInRankEnd(long rank) { - return rank <= rankEnd; - } - - protected boolean isInRankRange(long rank) { + protected boolean isInRankRange(long rank, long rankEnd) { return rank <= rankEnd && rank >= rankStart; } @@ -323,30 +338,46 @@ public abstract class AbstractTopNFunction extends KeyedProcessFunction<RowData, this.keyContext = topNFunction.keyContext; } - protected void collectInsert(Collector<RowData> out, RowData inputRow, long rank) { - topNFunction.collectInsert(out, inputRow, rank); + protected void collectInsert( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + topNFunction.collectInsert(out, inputRow, rank, rankEnd); } protected void collectInsert(Collector<RowData> out, RowData inputRow) { topNFunction.collectInsert(out, inputRow); } - protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow, long rank) { - topNFunction.collectUpdateAfter(out, inputRow, rank); + protected void collectDelete( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + topNFunction.collectDelete(out, inputRow, rank, rankEnd); + } + + protected void collectDelete(Collector<RowData> out, RowData inputRow) { + topNFunction.collectDelete(out, inputRow); + } + + protected void collectUpdateAfter( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + topNFunction.collectUpdateAfter(out, inputRow, rank, rankEnd); } protected void collectUpdateAfter(Collector<RowData> out, RowData inputRow) { topNFunction.collectUpdateAfter(out, inputRow); } - protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow, long rank) { - topNFunction.collectUpdateBefore(out, inputRow, rank); + protected void collectUpdateBefore( + Collector<RowData> out, RowData inputRow, long rank, long rankEnd) { + topNFunction.collectUpdateBefore(out, inputRow, rank, rankEnd); } protected void collectUpdateBefore(Collector<RowData> out, RowData inputRow) { topNFunction.collectUpdateBefore(out, inputRow); } + protected boolean isInRankEnd(long rank, long rankEnd) { + return rank <= rankEnd; + } + public void accRequestCount() { requestCount++; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java index abcaa881014..a5a031c16ea 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java @@ -28,21 +28,15 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.util.Collector; -import org.apache.flink.shaded.guava32.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * A TopN function could handle insert-only stream. @@ -53,8 +47,6 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { private static final long serialVersionUID = -4708453213104128011L; - private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyTopNFunction.class); - private final InternalTypeInfo<RowData> sortKeyType; private final TypeSerializer<RowData> inputRowSer; private final long cacheSize; @@ -65,8 +57,7 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { // the buffer stores mapping from sort key to records list, a heap mirror to dataState private transient TopNBuffer buffer; - // the kvSortedMap stores mapping from partition key to it's buffer - private transient Cache<RowData, TopNBuffer> kvSortedMap; + private transient SyncStateAppendOnlyTopNHelper helper; public AppendOnlyTopNFunction( StateTtlConfig ttlConfig, @@ -95,17 +86,6 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { @Override public void open(OpenContext openContext) throws Exception { super.open(openContext); - int lruCacheSize = Math.max(1, (int) (cacheSize / getDefaultTopNSize())); - CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder(); - if (ttlConfig.isEnabled()) { - cacheBuilder.expireAfterWrite( - ttlConfig.getTimeToLive().toMillis(), TimeUnit.MILLISECONDS); - } - kvSortedMap = cacheBuilder.maximumSize(lruCacheSize).build(); - LOG.info( - "Top{} operator is using LRU caches key-size: {}", - getDefaultTopNSize(), - lruCacheSize); ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<>(inputRowType); MapStateDescriptor<RowData, List<RowData>> mapStateDescriptor = @@ -115,8 +95,9 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { } dataState = getRuntimeContext().getMapState(mapStateDescriptor); - // metrics - registerMetric(kvSortedMap.size() * getDefaultTopNSize()); + helper = new SyncStateAppendOnlyTopNHelper(); + + helper.registerMetric(); } @Override @@ -139,20 +120,20 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { if (outputRankNumber || hasOffset()) { // the without-number-algorithm can't handle topN with offset, // so use the with-number-algorithm to handle offset - processElementWithRowNumber(sortKey, input, out); + helper.processElementWithRowNumber(buffer, sortKey, input, rankEnd, out); } else { - processElementWithoutRowNumber(input, out); + helper.processElementWithoutRowNumber(buffer, input, rankEnd, out); } } } private void initHeapStates() throws Exception { - requestCount += 1; + helper.accRequestCount(); RowData currentKey = (RowData) keyContext.getCurrentKey(); - buffer = kvSortedMap.getIfPresent(currentKey); + buffer = helper.getTopNBufferFromCache(currentKey); if (buffer == null) { buffer = new TopNBuffer(sortKeyComparator, ArrayList::new); - kvSortedMap.put(currentKey, buffer); + helper.saveTopNBufferToCache(currentKey, buffer); // restore buffer Iterator<Map.Entry<RowData, List<RowData>>> iter = dataState.iterator(); if (iter != null) { @@ -165,84 +146,27 @@ public class AppendOnlyTopNFunction extends AbstractSyncStateTopNFunction { } } } else { - hitCount += 1; + helper.accHitCount(); } } - private void processElementWithRowNumber(RowData sortKey, RowData input, Collector<RowData> out) - throws Exception { - Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator(); - long currentRank = 0L; - boolean findsSortKey = false; - RowData currentRow = null; - while (iterator.hasNext() && isInRankEnd(currentRank)) { - Map.Entry<RowData, Collection<RowData>> entry = iterator.next(); - Collection<RowData> records = entry.getValue(); - // meet its own sort key - if (!findsSortKey && entry.getKey().equals(sortKey)) { - currentRank += records.size(); - currentRow = input; - findsSortKey = true; - } else if (findsSortKey) { - Iterator<RowData> recordsIter = records.iterator(); - while (recordsIter.hasNext() && isInRankEnd(currentRank)) { - RowData prevRow = recordsIter.next(); - collectUpdateBefore(out, prevRow, currentRank); - collectUpdateAfter(out, currentRow, currentRank); - currentRow = prevRow; - currentRank += 1; - } - } else { - currentRank += records.size(); - } - } - if (isInRankEnd(currentRank)) { - // there is no enough elements in Top-N, emit INSERT message for the new record. - collectInsert(out, currentRow, currentRank); + private class SyncStateAppendOnlyTopNHelper extends AppendOnlyTopNHelper { + + public SyncStateAppendOnlyTopNHelper() { + super( + AppendOnlyTopNFunction.this, + cacheSize, + AppendOnlyTopNFunction.this.getDefaultTopNSize()); } - // remove the records associated to the sort key which is out of topN - List<RowData> toDeleteSortKeys = new ArrayList<>(); - while (iterator.hasNext()) { - Map.Entry<RowData, Collection<RowData>> entry = iterator.next(); - RowData key = entry.getKey(); + @Override + protected void removeFromState(RowData key) throws Exception { dataState.remove(key); - toDeleteSortKeys.add(key); - } - for (RowData toDeleteKey : toDeleteSortKeys) { - buffer.removeAll(toDeleteKey); } - } - private void processElementWithoutRowNumber(RowData input, Collector<RowData> out) - throws Exception { - // remove retired element - if (buffer.getCurrentTopNum() > rankEnd) { - Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry(); - RowData lastKey = lastEntry.getKey(); - Collection<RowData> lastList = lastEntry.getValue(); - RowData lastElement = buffer.lastElement(); - int size = lastList.size(); - // remove last one - if (size <= 1) { - buffer.removeAll(lastKey); - dataState.remove(lastKey); - } else { - buffer.removeLast(); - // last element has been removed from lastList, we have to copy a new collection - // for lastList to avoid mutating state values, see CopyOnWriteStateMap, - // otherwise, the result might be corrupt. - // don't need to perform a deep copy, because RowData elements will not be updated - dataState.put(lastKey, new ArrayList<>(lastList)); - } - if (size == 0 || input.equals(lastElement)) { - return; - } else { - // lastElement shouldn't be null - collectDelete(out, lastElement); - } + @Override + protected void updateState(RowData key, List<RowData> value) throws Exception { + dataState.put(key, value); } - // it first appears in the TopN, send INSERT message - collectInsert(out, input); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java index 4e8c884b5c2..3fb63d8a282 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java @@ -18,7 +18,14 @@ package org.apache.flink.table.runtime.operators.rank.async; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -27,13 +34,13 @@ import org.apache.flink.table.runtime.operators.rank.RankRange; import org.apache.flink.table.runtime.operators.rank.RankType; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -/** - * Base class for TopN Function with async state api. - * - * <p>TODO FLINK-36831 support variable rank end in async state rank later. - */ +import java.util.Objects; + +/** Base class for TopN Function with async state api. */ public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunction { + private ValueState<Long> rankEndState; + public AbstractAsyncStateTopNFunction( StateTtlConfig ttlConfig, InternalTypeInfo<RowData> inputRowType, @@ -52,9 +59,51 @@ public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunctio rankRange, generateUpdateBefore, outputRankNumber); + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + if (!isConstantRankEnd) { - throw new UnsupportedOperationException( - "Variable rank end is not supported in rank with async state api."); + ValueStateDescriptor<Long> rankStateDesc = + new ValueStateDescriptor<>("rankEnd", Types.LONG); + if (ttlConfig.isEnabled()) { + rankStateDesc.enableTimeToLive(ttlConfig); + } + rankEndState = + ((StreamingRuntimeContext) getRuntimeContext()).getValueState(rankStateDesc); + } + } + + /** + * Initialize rank end. + * + * @param row input record + * @return rank end + */ + protected StateFuture<Long> initRankEnd(RowData row) { + if (isConstantRankEnd) { + return StateFutureUtils.completedFuture(Objects.requireNonNull(constantRankEnd)); + } else { + return rankEndState + .asyncValue() + .thenApply( + rankEndInState -> { + long curRankEnd = rankEndFetcher.apply(row); + if (rankEndInState == null) { + // no need to wait this future + rankEndState.asyncUpdate(curRankEnd); + return curRankEnd; + } else { + if (rankEndInState != curRankEnd) { + // increment the invalid counter when the current rank end + // not equal to previous rank end + invalidCounter.inc(); + } + return rankEndInState; + } + }); } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java new file mode 100644 index 00000000000..962298bb989 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.rank.async; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction; +import org.apache.flink.table.runtime.operators.rank.RankRange; +import org.apache.flink.table.runtime.operators.rank.RankType; +import org.apache.flink.table.runtime.operators.rank.TopNBuffer; +import org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A TopN function could handle insert-only stream. + * + * <p>The input stream should only contain INSERT messages. + * + * <p>Different with {@link AppendOnlyTopNFunction}, this function is used with async state api. + */ +public class AsyncStateAppendOnlyTopNFunction extends AbstractAsyncStateTopNFunction { + + private static final long serialVersionUID = 1L; + + private final InternalTypeInfo<RowData> sortKeyType; + private final TypeSerializer<RowData> inputRowSer; + private final long cacheSize; + + // a map state stores mapping from sort key to records list which is in topN + private transient MapState<RowData, List<RowData>> dataState; + + private transient AsyncStateAppendOnlyTopNHelper helper; + + public AsyncStateAppendOnlyTopNFunction( + StateTtlConfig ttlConfig, + InternalTypeInfo<RowData> inputRowType, + GeneratedRecordComparator sortKeyGeneratedRecordComparator, + RowDataKeySelector sortKeySelector, + RankType rankType, + RankRange rankRange, + boolean generateUpdateBefore, + boolean outputRankNumber, + long cacheSize) { + super( + ttlConfig, + inputRowType, + sortKeyGeneratedRecordComparator, + sortKeySelector, + rankType, + rankRange, + generateUpdateBefore, + outputRankNumber); + this.sortKeyType = sortKeySelector.getProducedType(); + this.inputRowSer = inputRowType.createSerializer(new SerializerConfigImpl()); + this.cacheSize = cacheSize; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<>(inputRowType); + MapStateDescriptor<RowData, List<RowData>> mapStateDescriptor = + new MapStateDescriptor<>("data-state-with-append", sortKeyType, valueTypeInfo); + if (ttlConfig.isEnabled()) { + mapStateDescriptor.enableTimeToLive(ttlConfig); + } + dataState = ((StreamingRuntimeContext) getRuntimeContext()).getMapState(mapStateDescriptor); + + helper = new AsyncStateAppendOnlyTopNHelper(); + + helper.registerMetric(); + } + + @Override + public void processElement(RowData input, Context context, Collector<RowData> out) + throws Exception { + StateFuture<TopNBuffer> topNBufferFuture = initHeapStates(); + StateFuture<Long> rankEndFuture = initRankEnd(input); + + RowData sortKey = sortKeySelector.getKey(input); + topNBufferFuture.thenCombine( + rankEndFuture, + (buffer, rankEnd) -> { + if (checkSortKeyInBufferRange(sortKey, buffer)) { + // insert sort key into buffer + buffer.put(sortKey, inputRowSer.copy(input)); + Collection<RowData> inputs = buffer.get(sortKey); + + // update data state + // copy a new collection to avoid mutating state values, see + // CopyOnWriteStateMap, + // otherwise, the result might be corrupt. + // don't need to perform a deep copy, because RowData elements will not be + // updated + dataState + .asyncPut(sortKey, new ArrayList<>(inputs)) + .thenAccept( + VOID -> { + if (outputRankNumber || hasOffset()) { + // the without-number-algorithm can't handle topN + // with offset, + // so use the with-number-algorithm to handle offset + helper.processElementWithRowNumber( + buffer, sortKey, input, rankEnd, out); + } else { + helper.processElementWithoutRowNumber( + buffer, input, rankEnd, out); + } + }); + } + + return null; + }); + } + + private StateFuture<TopNBuffer> initHeapStates() { + helper.accRequestCount(); + RowData currentKey = (RowData) keyContext.getCurrentKey(); + TopNBuffer bufferFromCache = helper.getTopNBufferFromCache(currentKey); + if (bufferFromCache != null) { + helper.accHitCount(); + return StateFutureUtils.completedFuture(bufferFromCache); + } + + TopNBuffer bufferFromState = new TopNBuffer(sortKeyComparator, ArrayList::new); + // restore buffer + return dataState + .asyncEntries() + .thenCompose( + iter -> + iter.onNext( + entry -> { + RowData sortKey = entry.getKey(); + List<RowData> values = entry.getValue(); + // the order is preserved + bufferFromState.putAll(sortKey, values); + })) + .thenApply( + VOID -> { + helper.saveTopNBufferToCache(currentKey, bufferFromState); + return bufferFromState; + }); + } + + private class AsyncStateAppendOnlyTopNHelper extends AppendOnlyTopNHelper { + + public AsyncStateAppendOnlyTopNHelper() { + super( + AsyncStateAppendOnlyTopNFunction.this, + cacheSize, + AsyncStateAppendOnlyTopNFunction.this.getDefaultTopNSize()); + } + + @Override + protected void removeFromState(RowData key) throws Exception { + // no need to wait this async request to end + dataState.asyncRemove(key); + } + + @Override + protected void updateState(RowData key, List<RowData> value) throws Exception { + // no need to wait this async request to end + dataState.asyncPut(key, value); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java new file mode 100644 index 00000000000..e05a9934fe5 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.rank.utils; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction; +import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction; +import org.apache.flink.table.runtime.operators.rank.TopNBuffer; +import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava32.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * A helper to help do the logic 'Top-n' for append-only stream in {@link AppendOnlyTopNFunction} + * and {@link AsyncStateAppendOnlyTopNFunction}. + */ +public abstract class AppendOnlyTopNHelper extends AbstractTopNFunction.AbstractTopNHelper { + + private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyTopNHelper.class); + + // the kvSortedMap stores mapping from partition key to it's buffer + private final Cache<RowData, TopNBuffer> kvSortedMap; + + private final long topNSize; + + public AppendOnlyTopNHelper(AbstractTopNFunction topNFunction, long cacheSize, long topNSize) { + super(topNFunction); + + this.topNSize = topNSize; + + int lruCacheSize = Math.max(1, (int) (cacheSize / topNSize)); + CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder(); + if (ttlConfig.isEnabled()) { + cacheBuilder.expireAfterWrite( + ttlConfig.getTimeToLive().toMillis(), TimeUnit.MILLISECONDS); + } + kvSortedMap = cacheBuilder.maximumSize(lruCacheSize).build(); + + LOG.info("Top{} operator is using LRU caches key-size: {}", topNSize, lruCacheSize); + } + + public void registerMetric() { + registerMetric(kvSortedMap.size() * topNSize); + } + + @Nullable + public TopNBuffer getTopNBufferFromCache(RowData currentKey) { + return kvSortedMap.getIfPresent(currentKey); + } + + public void saveTopNBufferToCache(RowData currentKey, TopNBuffer topNBuffer) { + kvSortedMap.put(currentKey, topNBuffer); + } + + /** + * The without-number-algorithm can't handle topN with offset, so use the with-number-algorithm + * to handle offset. + */ + public void processElementWithRowNumber( + TopNBuffer buffer, RowData sortKey, RowData input, long rankEnd, Collector<RowData> out) + throws Exception { + Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator(); + long currentRank = 0L; + boolean findsSortKey = false; + RowData currentRow = null; + while (iterator.hasNext() && isInRankEnd(currentRank, rankEnd)) { + Map.Entry<RowData, Collection<RowData>> entry = iterator.next(); + Collection<RowData> records = entry.getValue(); + // meet its own sort key + if (!findsSortKey && entry.getKey().equals(sortKey)) { + currentRank += records.size(); + currentRow = input; + findsSortKey = true; + } else if (findsSortKey) { + Iterator<RowData> recordsIter = records.iterator(); + while (recordsIter.hasNext() && isInRankEnd(currentRank, rankEnd)) { + RowData prevRow = recordsIter.next(); + collectUpdateBefore(out, prevRow, currentRank, rankEnd); + collectUpdateAfter(out, currentRow, currentRank, rankEnd); + currentRow = prevRow; + currentRank += 1; + } + } else { + currentRank += records.size(); + } + } + if (isInRankEnd(currentRank, rankEnd)) { + // there is no enough elements in Top-N, emit INSERT message for the new record. + collectInsert(out, currentRow, currentRank, rankEnd); + } + + // remove the records associated to the sort key which is out of topN + List<RowData> toDeleteSortKeys = new ArrayList<>(); + while (iterator.hasNext()) { + Map.Entry<RowData, Collection<RowData>> entry = iterator.next(); + RowData key = entry.getKey(); + removeFromState(key); + toDeleteSortKeys.add(key); + } + for (RowData toDeleteKey : toDeleteSortKeys) { + buffer.removeAll(toDeleteKey); + } + } + + public void processElementWithoutRowNumber( + TopNBuffer buffer, RowData input, long rankEnd, Collector<RowData> out) + throws Exception { + // remove retired element + if (buffer.getCurrentTopNum() > rankEnd) { + Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry(); + RowData lastKey = lastEntry.getKey(); + Collection<RowData> lastList = lastEntry.getValue(); + RowData lastElement = buffer.lastElement(); + int size = lastList.size(); + // remove last one + if (size <= 1) { + buffer.removeAll(lastKey); + removeFromState(lastKey); + } else { + buffer.removeLast(); + // last element has been removed from lastList, we have to copy a new collection + // for lastList to avoid mutating state values, see CopyOnWriteStateMap, + // otherwise, the result might be corrupt. + // don't need to perform a deep copy, because RowData elements will not be updated + updateState(lastKey, new ArrayList<>(lastList)); + } + if (size == 0 || input.equals(lastElement)) { + return; + } else { + // lastElement shouldn't be null + collectDelete(out, lastElement); + } + } + // it first appears in the TopN, send INSERT message + collectInsert(out, input); + } + + protected abstract void removeFromState(RowData key) throws Exception; + + protected abstract void updateState(RowData key, List<RowData> value) throws Exception; +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java index 3ee0a381952..0d8ce3154fb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java @@ -88,7 +88,8 @@ public abstract class FastTop1Helper extends AbstractTopNFunction.AbstractTopNHe public void processAsFirstRow(RowData input, RowData currentKey, Collector<RowData> out) { kvCache.put(currentKey, inputRowSer.copy(input)); if (outputRankNumber) { - collectInsert(out, input, 1); + // the rank end of top-1 is always 1L + collectInsert(out, input, 1, 1); } else { collectInsert(out, input); } @@ -106,8 +107,9 @@ public abstract class FastTop1Helper extends AbstractTopNFunction.AbstractTopNHe // Note: partition key is unique key if only top-1 is desired, // thus emitting UB and UA here if (outputRankNumber) { - collectUpdateBefore(out, prevRow, 1); - collectUpdateAfter(out, input, 1); + // the rank end of top-1 is always 1L + collectUpdateBefore(out, prevRow, 1, 1); + collectUpdateAfter(out, input, 1, 1); } else { collectUpdateBefore(out, prevRow); collectUpdateAfter(out, input); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java index c6800fe6e94..f124e9700e5 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.rank; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction; import org.junit.jupiter.api.TestTemplate; @@ -29,7 +30,7 @@ import java.util.List; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; -/** Tests for {@link AppendOnlyTopNFunction}. */ +/** Tests for {@link AppendOnlyTopNFunction} and {@link AsyncStateAppendOnlyTopNFunction}. */ class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase { @Override @@ -39,21 +40,34 @@ class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase { boolean generateUpdateBefore, boolean outputRankNumber, boolean enableAsyncState) { - return new AppendOnlyTopNFunction( - ttlConfig, - inputRowType, - generatedSortKeyComparator, - sortKeySelector, - rankType, - rankRange, - generateUpdateBefore, - outputRankNumber, - cacheSize); + if (enableAsyncState) { + return new AsyncStateAppendOnlyTopNFunction( + ttlConfig, + inputRowType, + generatedSortKeyComparator, + sortKeySelector, + rankType, + rankRange, + generateUpdateBefore, + outputRankNumber, + cacheSize); + } else { + return new AppendOnlyTopNFunction( + ttlConfig, + inputRowType, + generatedSortKeyComparator, + sortKeySelector, + rankType, + rankRange, + generateUpdateBefore, + outputRankNumber, + cacheSize); + } } @Override boolean supportedAsyncState() { - return false; + return true; } @TestTemplate diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java index 31205dd3ec8..90784dcfd34 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java @@ -22,10 +22,8 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; import java.util.List; @@ -35,7 +33,6 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterR import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; /** Tests for {@link FastTop1Function} and {@link AsyncStateFastTop1Function}. */ -@ExtendWith(ParameterizedTestExtension.class) public class FastTop1FunctionTest extends TopNFunctionTestBase { @Override diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java index b831709ec8f..2a444117c60 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java @@ -60,7 +60,6 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterR import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assumptions.assumeFalse; -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** Base Tests for all subclass of {@link AbstractTopNFunction}. */ @ExtendWith(ParameterizedTestExtension.class) @@ -155,8 +154,6 @@ abstract class TopNFunctionTestBase { /** RankEnd column must be long, int or short type, but could not be string type yet. */ @TestTemplate void testInvalidVariableRankRangeWithIntType() throws Exception { - // rank with async state does not support variable rank range yet - assumeFalse(enableAsyncState); AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new VariableRankRange(0), true, false); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func); @@ -393,16 +390,6 @@ abstract class TopNFunctionTestBase { "output wrong.", expectedOutput, testHarness.getOutput()); } - @TestTemplate - void testNotSupportConstantRankRangeWithAsyncState() { - assumeTrue(enableAsyncState); - assertThatThrownBy( - () -> - createFunction( - RankType.ROW_NUMBER, new VariableRankRange(0), true, false)) - .isInstanceOf(UnsupportedOperationException.class); - } - OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( AbstractTopNFunction rankFunction) throws Exception { if (enableAsyncState) {