This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 227e61d2e01 [FLINK-26840][table-runtime] refactor topN functions
adding a general TopNBufferCacheRemovalListener
227e61d2e01 is described below
commit 227e61d2e010ec8123dcda78675f18deadf7df1b
Author: lincoln lee <[email protected]>
AuthorDate: Tue Aug 13 11:25:24 2024 +0800
[FLINK-26840][table-runtime] refactor topN functions adding a general
TopNBufferCacheRemovalListener
This closes #19220
---
.../runtime/operators/rank/FastTop1Function.java | 31 ++--------
.../rank/TopNBufferCacheRemovalListener.java | 67 ++++++++++++++++++++++
.../operators/rank/UpdatableTopNFunction.java | 45 +++------------
3 files changed, 80 insertions(+), 63 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
index 7a947b81908..7af4b8cab28 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
@@ -35,9 +35,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
-import
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +99,9 @@ public class FastTop1Function extends AbstractTopNFunction
implements Checkpoint
kvCache =
cacheBuilder
.maximumSize(lruCacheSize)
- .removalListener(new CacheRemovalListener())
+ .removalListener(
+ new TopNBufferCacheRemovalListener<>(
+ keyContext, this::flushBufferToState))
.build();
LOG.info("Top-1 operator is using LRU caches key-size: {}",
lruCacheSize);
@@ -163,7 +162,7 @@ public class FastTop1Function extends AbstractTopNFunction
implements Checkpoint
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
for (Map.Entry<RowData, RowData> entry : kvCache.asMap().entrySet()) {
keyContext.setCurrentKey(entry.getKey());
- dataState.update(entry.getValue());
+ flushBufferToState(entry.getValue());
}
}
@@ -172,25 +171,7 @@ public class FastTop1Function extends AbstractTopNFunction
implements Checkpoint
// nothing to do
}
- private class CacheRemovalListener implements RemovalListener<RowData,
RowData> {
- @Override
- public void onRemoval(RemovalNotification<RowData, RowData>
notification) {
- if (notification.getCause() != RemovalCause.SIZE ||
notification.getValue() == null) {
- // Don't flush values to state if cause is ttl expired
- return;
- }
-
- RowData previousKey = (RowData) keyContext.getCurrentKey();
- RowData partitionKey = notification.getKey();
- keyContext.setCurrentKey(partitionKey);
- try {
- dataState.update(notification.getValue());
- } catch (Throwable e) {
- LOG.error("Fail to synchronize state!", e);
- throw new RuntimeException(e);
- } finally {
- keyContext.setCurrentKey(previousKey);
- }
- }
+ private void flushBufferToState(RowData rowData) throws Exception {
+ dataState.update(rowData);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
new file mode 100644
index 00000000000..33eedbe3a12
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.rank;
+
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
+import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
+import
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
+
+/**
+ * A common cache removal listener for rank node.
+ *
+ * @param <V> is the value type of the cache.
+ */
+public class TopNBufferCacheRemovalListener<V> implements
RemovalListener<RowData, V> {
+ // Why not use the executionContext? because the AbstractTopNFunction
relies on the keyContext.
+ private final KeyContext keyContext;
+ private final ThrowingConsumer<V, Exception> callBack;
+
+ public TopNBufferCacheRemovalListener(
+ KeyContext keyContext, ThrowingConsumer<V, Exception> callBack) {
+ this.keyContext = keyContext;
+ this.callBack = callBack;
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<RowData, V> removalNotification)
{
+ if (removalNotification.getCause() != RemovalCause.SIZE
+ || removalNotification.getValue() == null) {
+ // Don't flush values to state if removed because of ttl
+ return;
+ }
+ RowData previousKey = (RowData) keyContext.getCurrentKey();
+ RowData partitionKey = removalNotification.getKey();
+ V value = removalNotification.getValue();
+ if (partitionKey == null || value == null) {
+ return;
+ }
+ keyContext.setCurrentKey(partitionKey);
+ try {
+ callBack.accept(value);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to execute callback", e);
+ } finally {
+ keyContext.setCurrentKey(previousKey);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
index a2009680cd9..8f3820a0c17 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
@@ -41,9 +41,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
-import
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,7 +136,9 @@ public class UpdatableTopNFunction extends
AbstractTopNFunction implements Check
kvRowKeyMap =
cacheBuilder
.maximumSize(lruCacheSize)
- .removalListener(new CacheRemovalListener())
+ .removalListener(
+ new TopNBufferCacheRemovalListener<>(
+ keyContext, this::flushBufferToState))
.build();
LOG.info(
@@ -192,9 +191,8 @@ public class UpdatableTopNFunction extends
AbstractTopNFunction implements Check
for (Map.Entry<RowData, Tuple2<TopNBuffer, Map<RowData, RankRow>>>
entry :
kvRowKeyMap.asMap().entrySet()) {
RowData partitionKey = entry.getKey();
- Map<RowData, RankRow> currentRowKeyMap = entry.getValue().f1;
keyContext.setCurrentKey(partitionKey);
- flushBufferToState(currentRowKeyMap);
+ flushBufferToState(entry.getValue());
}
}
@@ -543,7 +541,9 @@ public class UpdatableTopNFunction extends
AbstractTopNFunction implements Check
}
}
- private void flushBufferToState(Map<RowData, RankRow> curRowKeyMap) throws
Exception {
+ private void flushBufferToState(Tuple2<TopNBuffer, Map<RowData, RankRow>>
bufferEntry)
+ throws Exception {
+ Map<RowData, RankRow> curRowKeyMap = bufferEntry.f1;
for (Map.Entry<RowData, RankRow> entry : curRowKeyMap.entrySet()) {
RowData key = entry.getKey();
RankRow rankRow = entry.getValue();
@@ -572,37 +572,6 @@ public class UpdatableTopNFunction extends
AbstractTopNFunction implements Check
}
}
- private class CacheRemovalListener
- implements RemovalListener<RowData, Tuple2<TopNBuffer,
Map<RowData, RankRow>>> {
-
- @Override
- public void onRemoval(
- RemovalNotification<RowData, Tuple2<TopNBuffer, Map<RowData,
RankRow>>>
- notification) {
- if (notification.getCause() != RemovalCause.SIZE) {
- // Don't flush values to state if cause is ttl expired
- return;
- }
-
- RowData partitionKey = notification.getKey();
- Tuple2<TopNBuffer, Map<RowData, RankRow>> value =
notification.getValue();
- if (partitionKey == null || value == null) {
- return;
- }
-
- RowData previousKey = (RowData) keyContext.getCurrentKey();
- keyContext.setCurrentKey(partitionKey);
- try {
- flushBufferToState(value.f1);
- } catch (Throwable e) {
- LOG.error("Fail to synchronize state!", e);
- throw new RuntimeException(e);
- } finally {
- keyContext.setCurrentKey(previousKey);
- }
- }
- }
-
private static class RankRow {
private final RowData row;
private int innerRank;