This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bd4b0aa  [FLINK-21027][state] Introduce 
KeyedStateBackend#isSafeToReuseKVState for opmitization hint (#17945)
bd4b0aa is described below

commit bd4b0aa2ace713ca81ab3aa1ffe3bd86822d365c
Author: Yun Tang <[email protected]>
AuthorDate: Mon Dec 6 20:20:02 2021 +0800

    [FLINK-21027][state] Introduce KeyedStateBackend#isSafeToReuseKVState for 
opmitization hint (#17945)
---
 .../flink/runtime/state/KeyedStateBackend.java     |  15 +++
 .../flink/runtime/state/StateBackendTestBase.java  |  17 +++
 .../changelog/ChangelogKeyedStateBackend.java      |   4 +-
 ...logDelegateEmbeddedRocksDBStateBackendTest.java |   5 +
 .../streaming/state/RocksDBKeyedStateBackend.java  |   4 +-
 .../state/EmbeddedRocksDBStateBackendTest.java     |   5 +
 .../table/planner/utils/StateConfigUtilTest.java   | 124 ---------------------
 .../window/buffers/RecordsWindowBuffer.java        |   3 +-
 .../flink/table/runtime/util/StateConfigUtil.java  |   7 --
 9 files changed, 47 insertions(+), 137 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index fe5931c..c61406e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -137,10 +137,25 @@ public interface KeyedStateBackend<K>
      */
     boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);
 
+    @Deprecated
     default boolean isStateImmutableInStateBackend(CheckpointType 
checkpointOptions) {
         return false;
     }
 
+    /**
+     * Whether it's safe to reuse key-values from the state-backend, e.g for 
the purpose of
+     * optimization.
+     *
+     * <p>NOTE: this method should not be used to check for {@link 
InternalPriorityQueue}, as the
+     * priority queue could be stored on different locations, e.g RocksDB 
state-backend could store
+     * that on JVM heap if configuring HEAP as the time-service factory.
+     *
+     * @return returns ture if safe to reuse the key-values from the 
state-backend.
+     */
+    default boolean isSafeToReuseKVState() {
+        return false;
+    }
+
     /** Listener is given a callback when {@link #setCurrentKey} is called 
(key context changes). */
     @FunctionalInterface
     interface KeySelectionListener<K> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 21b2694..79c72ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -314,6 +314,18 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
     }
 
     @Test
+    public void testIsSafeToReuseState() throws Exception {
+        CheckpointableKeyedStateBackend<Integer> backend =
+                createKeyedBackend(IntSerializer.INSTANCE);
+        try {
+            Assert.assertEquals(isSafeToReuseKVState(), 
backend.isSafeToReuseKVState());
+        } finally {
+            IOUtils.closeQuietly(backend);
+            backend.dispose();
+        }
+    }
+
+    @Test
     public void testKeyGroupedInternalPriorityQueue() throws Exception {
         testKeyGroupedInternalPriorityQueue(false);
     }
@@ -5613,4 +5625,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
     protected boolean supportsMetaInfoVerification() {
         return true;
     }
+
+    /** @return true if state backend is safe to reuse state. */
+    protected boolean isSafeToReuseKVState() {
+        return false;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 5269bf5..86bca74 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -420,8 +420,8 @@ public class ChangelogKeyedStateBackend<K>
     }
 
     @Override
-    public boolean isStateImmutableInStateBackend(CheckpointType 
checkpointOptions) {
-        return 
keyedStateBackend.isStateImmutableInStateBackend(checkpointOptions);
+    public boolean isSafeToReuseKVState() {
+        return keyedStateBackend.isSafeToReuseKVState();
     }
 
     @Nonnull
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index b45e3b0..6b34ce1 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -56,6 +56,11 @@ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
         return false;
     }
 
+    @Override
+    protected boolean isSafeToReuseKVState() {
+        return true;
+    }
+
     @Test
     @Ignore("The type of handle returned from snapshot() is not incremental")
     public void testSharedIncrementalStateDeRegistration() {}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ea83250..9d21fcb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -873,8 +873,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
     }
 
     @Override
-    public boolean isStateImmutableInStateBackend(CheckpointType 
checkpointType) {
-        return !requiresLegacySynchronousTimerSnapshots(checkpointType);
+    public boolean isSafeToReuseKVState() {
+        return true;
     }
 
     /** Rocks DB specific information about the k/v states. */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index cb67a07..65302ca 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -186,6 +186,11 @@ public class EmbeddedRocksDBStateBackendTest
         return true;
     }
 
+    @Override
+    protected boolean isSafeToReuseKVState() {
+        return true;
+    }
+
     // small safety net for instance cleanups, so that no native objects are 
left
     @After
     public void cleanupRocksDB() {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
deleted file mode 100644
index 15c6df2..0000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.utils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.util.StateConfigUtil;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/** Tests for {@link StateConfigUtil}. */
-public class StateConfigUtilTest {
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Test
-    public void testRocksDBWithHeapTimer() throws Exception {
-        File tempDir = tempFolder.newFolder().getAbsoluteFile();
-        Configuration conf = new Configuration();
-        conf.setString("state.backend", "rocksdb");
-        conf.setString("state.backend.rocksdb.timer-service.factory", "HEAP");
-        conf.setString("state.checkpoints.dir", "file://" + 
tempDir.toString());
-        assertIsStateImmutable(false, conf);
-    }
-
-    @Test
-    public void testRocksDBWithDefaultTimer() throws Exception {
-        File tempDir = tempFolder.newFolder().getAbsoluteFile();
-        Configuration conf = new Configuration();
-        conf.setString("state.backend", "rocksdb");
-        conf.setString("state.checkpoints.dir", "file://" + 
tempDir.toString());
-        assertIsStateImmutable(true, conf);
-    }
-
-    @Test
-    public void testHeapState() throws Exception {
-        Configuration conf = new Configuration();
-        assertIsStateImmutable(false, conf);
-    }
-
-    private void assertIsStateImmutable(boolean result, Configuration conf) 
throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
-        env.setParallelism(1);
-        env.fromElements("a", "b", "c")
-                .keyBy(s -> s)
-                .transform(
-                        "testing",
-                        BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                        new TestingStateBackendOperator())
-                .addSink(new VerifyingSink());
-        env.execute();
-        assertEquals(Arrays.asList(result, result, result), 
VerifyingSink.RESULT);
-    }
-
-    @After
-    public void before() {
-        VerifyingSink.RESULT.clear();
-    }
-
-    private static final class TestingStateBackendOperator extends 
AbstractStreamOperator<Boolean>
-            implements OneInputStreamOperator<String, Boolean> {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient Boolean result = null;
-
-        @Override
-        public void open() throws Exception {
-            super.open();
-            this.result = 
StateConfigUtil.isStateImmutableInStateBackend(getKeyedStateBackend());
-        }
-
-        @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
-            if (result != null) {
-                output.collect(new StreamRecord<>(result));
-            }
-        }
-    }
-
-    private static final class VerifyingSink implements SinkFunction<Boolean> {
-        private static final long serialVersionUID = 1L;
-
-        private static final List<Boolean> RESULT = new ArrayList<>();
-
-        @Override
-        public void invoke(Boolean value, Context context) throws Exception {
-            synchronized (RESULT) {
-                RESULT.add(value);
-            }
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
index 9ca57f4..2281c94 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
@@ -38,7 +38,6 @@ import java.io.EOFException;
 import java.time.ZoneId;
 import java.util.Iterator;
 
-import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /**
@@ -161,7 +160,7 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
             RecordsCombiner combiner =
                     factory.createRecordsCombiner(
                             runtimeContext, timerService, stateBackend, 
windowState, isEventTime);
-            boolean requiresCopy = 
!isStateImmutableInStateBackend(stateBackend);
+            boolean requiresCopy = !stateBackend.isSafeToReuseKVState();
             return new RecordsWindowBuffer(
                     operatorOwner,
                     memoryManager,
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
index d594a7c..0b8b5cc 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
@@ -20,8 +20,6 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 
 /** Utility to create a {@link StateTtlConfig} object. */
 public class StateConfigUtil {
@@ -40,9 +38,4 @@ public class StateConfigUtil {
             return StateTtlConfig.DISABLED;
         }
     }
-
-    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
keyedStateBackend) {
-        // TODO: remove the hard code check once FLINK-21027 is supported
-        return 
keyedStateBackend.isStateImmutableInStateBackend(CheckpointType.CHECKPOINT);
-    }
 }

Reply via email to