This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 227e61d2e01 [FLINK-26840][table-runtime] refactor topN functions 
adding a general TopNBufferCacheRemovalListener
227e61d2e01 is described below

commit 227e61d2e010ec8123dcda78675f18deadf7df1b
Author: lincoln lee <[email protected]>
AuthorDate: Tue Aug 13 11:25:24 2024 +0800

    [FLINK-26840][table-runtime] refactor topN functions adding a general 
TopNBufferCacheRemovalListener
    
    This closes #19220
---
 .../runtime/operators/rank/FastTop1Function.java   | 31 ++--------
 .../rank/TopNBufferCacheRemovalListener.java       | 67 ++++++++++++++++++++++
 .../operators/rank/UpdatableTopNFunction.java      | 45 +++------------
 3 files changed, 80 insertions(+), 63 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
index 7a947b81908..7af4b8cab28 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java
@@ -35,9 +35,6 @@ import org.apache.flink.util.Collector;
 
 import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
 import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
-import 
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,7 +99,9 @@ public class FastTop1Function extends AbstractTopNFunction 
implements Checkpoint
         kvCache =
                 cacheBuilder
                         .maximumSize(lruCacheSize)
-                        .removalListener(new CacheRemovalListener())
+                        .removalListener(
+                                new TopNBufferCacheRemovalListener<>(
+                                        keyContext, this::flushBufferToState))
                         .build();
         LOG.info("Top-1 operator is using LRU caches key-size: {}", 
lruCacheSize);
 
@@ -163,7 +162,7 @@ public class FastTop1Function extends AbstractTopNFunction 
implements Checkpoint
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         for (Map.Entry<RowData, RowData> entry : kvCache.asMap().entrySet()) {
             keyContext.setCurrentKey(entry.getKey());
-            dataState.update(entry.getValue());
+            flushBufferToState(entry.getValue());
         }
     }
 
@@ -172,25 +171,7 @@ public class FastTop1Function extends AbstractTopNFunction 
implements Checkpoint
         // nothing to do
     }
 
-    private class CacheRemovalListener implements RemovalListener<RowData, 
RowData> {
-        @Override
-        public void onRemoval(RemovalNotification<RowData, RowData> 
notification) {
-            if (notification.getCause() != RemovalCause.SIZE || 
notification.getValue() == null) {
-                // Don't flush values to state if cause is ttl expired
-                return;
-            }
-
-            RowData previousKey = (RowData) keyContext.getCurrentKey();
-            RowData partitionKey = notification.getKey();
-            keyContext.setCurrentKey(partitionKey);
-            try {
-                dataState.update(notification.getValue());
-            } catch (Throwable e) {
-                LOG.error("Fail to synchronize state!", e);
-                throw new RuntimeException(e);
-            } finally {
-                keyContext.setCurrentKey(previousKey);
-            }
-        }
+    private void flushBufferToState(RowData rowData) throws Exception {
+        dataState.update(rowData);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
new file mode 100644
index 00000000000..33eedbe3a12
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBufferCacheRemovalListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.rank;
+
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
+import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
+import 
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
+
+/**
+ * A common cache removal listener for rank node.
+ *
+ * @param <V> is the value type of the cache.
+ */
+public class TopNBufferCacheRemovalListener<V> implements 
RemovalListener<RowData, V> {
+    // Why not use the executionContext? because the AbstractTopNFunction 
relies on the keyContext.
+    private final KeyContext keyContext;
+    private final ThrowingConsumer<V, Exception> callBack;
+
+    public TopNBufferCacheRemovalListener(
+            KeyContext keyContext, ThrowingConsumer<V, Exception> callBack) {
+        this.keyContext = keyContext;
+        this.callBack = callBack;
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<RowData, V> removalNotification) 
{
+        if (removalNotification.getCause() != RemovalCause.SIZE
+                || removalNotification.getValue() == null) {
+            // Don't flush values to state if removed because of ttl
+            return;
+        }
+        RowData previousKey = (RowData) keyContext.getCurrentKey();
+        RowData partitionKey = removalNotification.getKey();
+        V value = removalNotification.getValue();
+        if (partitionKey == null || value == null) {
+            return;
+        }
+        keyContext.setCurrentKey(partitionKey);
+        try {
+            callBack.accept(value);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to execute callback", e);
+        } finally {
+            keyContext.setCurrentKey(previousKey);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
index a2009680cd9..8f3820a0c17 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
@@ -41,9 +41,6 @@ import org.apache.flink.util.Collector;
 
 import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
 import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalCause;
-import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
-import 
org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +136,9 @@ public class UpdatableTopNFunction extends 
AbstractTopNFunction implements Check
         kvRowKeyMap =
                 cacheBuilder
                         .maximumSize(lruCacheSize)
-                        .removalListener(new CacheRemovalListener())
+                        .removalListener(
+                                new TopNBufferCacheRemovalListener<>(
+                                        keyContext, this::flushBufferToState))
                         .build();
 
         LOG.info(
@@ -192,9 +191,8 @@ public class UpdatableTopNFunction extends 
AbstractTopNFunction implements Check
         for (Map.Entry<RowData, Tuple2<TopNBuffer, Map<RowData, RankRow>>> 
entry :
                 kvRowKeyMap.asMap().entrySet()) {
             RowData partitionKey = entry.getKey();
-            Map<RowData, RankRow> currentRowKeyMap = entry.getValue().f1;
             keyContext.setCurrentKey(partitionKey);
-            flushBufferToState(currentRowKeyMap);
+            flushBufferToState(entry.getValue());
         }
     }
 
@@ -543,7 +541,9 @@ public class UpdatableTopNFunction extends 
AbstractTopNFunction implements Check
         }
     }
 
-    private void flushBufferToState(Map<RowData, RankRow> curRowKeyMap) throws 
Exception {
+    private void flushBufferToState(Tuple2<TopNBuffer, Map<RowData, RankRow>> 
bufferEntry)
+            throws Exception {
+        Map<RowData, RankRow> curRowKeyMap = bufferEntry.f1;
         for (Map.Entry<RowData, RankRow> entry : curRowKeyMap.entrySet()) {
             RowData key = entry.getKey();
             RankRow rankRow = entry.getValue();
@@ -572,37 +572,6 @@ public class UpdatableTopNFunction extends 
AbstractTopNFunction implements Check
         }
     }
 
-    private class CacheRemovalListener
-            implements RemovalListener<RowData, Tuple2<TopNBuffer, 
Map<RowData, RankRow>>> {
-
-        @Override
-        public void onRemoval(
-                RemovalNotification<RowData, Tuple2<TopNBuffer, Map<RowData, 
RankRow>>>
-                        notification) {
-            if (notification.getCause() != RemovalCause.SIZE) {
-                // Don't flush values to state if cause is ttl expired
-                return;
-            }
-
-            RowData partitionKey = notification.getKey();
-            Tuple2<TopNBuffer, Map<RowData, RankRow>> value = 
notification.getValue();
-            if (partitionKey == null || value == null) {
-                return;
-            }
-
-            RowData previousKey = (RowData) keyContext.getCurrentKey();
-            keyContext.setCurrentKey(partitionKey);
-            try {
-                flushBufferToState(value.f1);
-            } catch (Throwable e) {
-                LOG.error("Fail to synchronize state!", e);
-                throw new RuntimeException(e);
-            } finally {
-                keyContext.setCurrentKey(previousKey);
-            }
-        }
-    }
-
     private static class RankRow {
         private final RowData row;
         private int innerRank;

Reply via email to