This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bd4b0aa [FLINK-21027][state] Introduce
KeyedStateBackend#isSafeToReuseKVState for opmitization hint (#17945)
bd4b0aa is described below
commit bd4b0aa2ace713ca81ab3aa1ffe3bd86822d365c
Author: Yun Tang <[email protected]>
AuthorDate: Mon Dec 6 20:20:02 2021 +0800
[FLINK-21027][state] Introduce KeyedStateBackend#isSafeToReuseKVState for
opmitization hint (#17945)
---
.../flink/runtime/state/KeyedStateBackend.java | 15 +++
.../flink/runtime/state/StateBackendTestBase.java | 17 +++
.../changelog/ChangelogKeyedStateBackend.java | 4 +-
...logDelegateEmbeddedRocksDBStateBackendTest.java | 5 +
.../streaming/state/RocksDBKeyedStateBackend.java | 4 +-
.../state/EmbeddedRocksDBStateBackendTest.java | 5 +
.../table/planner/utils/StateConfigUtilTest.java | 124 ---------------------
.../window/buffers/RecordsWindowBuffer.java | 3 +-
.../flink/table/runtime/util/StateConfigUtil.java | 7 --
9 files changed, 47 insertions(+), 137 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index fe5931c..c61406e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -137,10 +137,25 @@ public interface KeyedStateBackend<K>
*/
boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);
+ @Deprecated
default boolean isStateImmutableInStateBackend(CheckpointType
checkpointOptions) {
return false;
}
+ /**
+ * Whether it's safe to reuse key-values from the state-backend, e.g for
the purpose of
+ * optimization.
+ *
+ * <p>NOTE: this method should not be used to check for {@link
InternalPriorityQueue}, as the
+ * priority queue could be stored on different locations, e.g RocksDB
state-backend could store
+ * that on JVM heap if configuring HEAP as the time-service factory.
+ *
+ * @return returns ture if safe to reuse the key-values from the
state-backend.
+ */
+ default boolean isSafeToReuseKVState() {
+ return false;
+ }
+
/** Listener is given a callback when {@link #setCurrentKey} is called
(key context changes). */
@FunctionalInterface
interface KeySelectionListener<K> {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 21b2694..79c72ac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -314,6 +314,18 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> exten
}
@Test
+ public void testIsSafeToReuseState() throws Exception {
+ CheckpointableKeyedStateBackend<Integer> backend =
+ createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ Assert.assertEquals(isSafeToReuseKVState(),
backend.isSafeToReuseKVState());
+ } finally {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+
+ @Test
public void testKeyGroupedInternalPriorityQueue() throws Exception {
testKeyGroupedInternalPriorityQueue(false);
}
@@ -5613,4 +5625,9 @@ public abstract class StateBackendTestBase<B extends
AbstractStateBackend> exten
protected boolean supportsMetaInfoVerification() {
return true;
}
+
+ /** @return true if state backend is safe to reuse state. */
+ protected boolean isSafeToReuseKVState() {
+ return false;
+ }
}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 5269bf5..86bca74 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -420,8 +420,8 @@ public class ChangelogKeyedStateBackend<K>
}
@Override
- public boolean isStateImmutableInStateBackend(CheckpointType
checkpointOptions) {
- return
keyedStateBackend.isStateImmutableInStateBackend(checkpointOptions);
+ public boolean isSafeToReuseKVState() {
+ return keyedStateBackend.isSafeToReuseKVState();
}
@Nonnull
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index b45e3b0..6b34ce1 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -56,6 +56,11 @@ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
return false;
}
+ @Override
+ protected boolean isSafeToReuseKVState() {
+ return true;
+ }
+
@Test
@Ignore("The type of handle returned from snapshot() is not incremental")
public void testSharedIncrementalStateDeRegistration() {}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ea83250..9d21fcb 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -873,8 +873,8 @@ public class RocksDBKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
}
@Override
- public boolean isStateImmutableInStateBackend(CheckpointType
checkpointType) {
- return !requiresLegacySynchronousTimerSnapshots(checkpointType);
+ public boolean isSafeToReuseKVState() {
+ return true;
}
/** Rocks DB specific information about the k/v states. */
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index cb67a07..65302ca 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -186,6 +186,11 @@ public class EmbeddedRocksDBStateBackendTest
return true;
}
+ @Override
+ protected boolean isSafeToReuseKVState() {
+ return true;
+ }
+
// small safety net for instance cleanups, so that no native objects are
left
@After
public void cleanupRocksDB() {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
deleted file mode 100644
index 15c6df2..0000000
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StateConfigUtilTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.utils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.util.StateConfigUtil;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/** Tests for {@link StateConfigUtil}. */
-public class StateConfigUtilTest {
-
- @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Test
- public void testRocksDBWithHeapTimer() throws Exception {
- File tempDir = tempFolder.newFolder().getAbsoluteFile();
- Configuration conf = new Configuration();
- conf.setString("state.backend", "rocksdb");
- conf.setString("state.backend.rocksdb.timer-service.factory", "HEAP");
- conf.setString("state.checkpoints.dir", "file://" +
tempDir.toString());
- assertIsStateImmutable(false, conf);
- }
-
- @Test
- public void testRocksDBWithDefaultTimer() throws Exception {
- File tempDir = tempFolder.newFolder().getAbsoluteFile();
- Configuration conf = new Configuration();
- conf.setString("state.backend", "rocksdb");
- conf.setString("state.checkpoints.dir", "file://" +
tempDir.toString());
- assertIsStateImmutable(true, conf);
- }
-
- @Test
- public void testHeapState() throws Exception {
- Configuration conf = new Configuration();
- assertIsStateImmutable(false, conf);
- }
-
- private void assertIsStateImmutable(boolean result, Configuration conf)
throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- env.fromElements("a", "b", "c")
- .keyBy(s -> s)
- .transform(
- "testing",
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- new TestingStateBackendOperator())
- .addSink(new VerifyingSink());
- env.execute();
- assertEquals(Arrays.asList(result, result, result),
VerifyingSink.RESULT);
- }
-
- @After
- public void before() {
- VerifyingSink.RESULT.clear();
- }
-
- private static final class TestingStateBackendOperator extends
AbstractStreamOperator<Boolean>
- implements OneInputStreamOperator<String, Boolean> {
-
- private static final long serialVersionUID = 1L;
-
- private transient Boolean result = null;
-
- @Override
- public void open() throws Exception {
- super.open();
- this.result =
StateConfigUtil.isStateImmutableInStateBackend(getKeyedStateBackend());
- }
-
- @Override
- public void processElement(StreamRecord<String> element) throws
Exception {
- if (result != null) {
- output.collect(new StreamRecord<>(result));
- }
- }
- }
-
- private static final class VerifyingSink implements SinkFunction<Boolean> {
- private static final long serialVersionUID = 1L;
-
- private static final List<Boolean> RESULT = new ArrayList<>();
-
- @Override
- public void invoke(Boolean value, Context context) throws Exception {
- synchronized (RESULT) {
- RESULT.add(value);
- }
- }
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
index 9ca57f4..2281c94 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
@@ -38,7 +38,6 @@ import java.io.EOFException;
import java.time.ZoneId;
import java.util.Iterator;
-import static
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/**
@@ -161,7 +160,7 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
RecordsCombiner combiner =
factory.createRecordsCombiner(
runtimeContext, timerService, stateBackend,
windowState, isEventTime);
- boolean requiresCopy =
!isStateImmutableInStateBackend(stateBackend);
+ boolean requiresCopy = !stateBackend.isSafeToReuseKVState();
return new RecordsWindowBuffer(
operatorOwner,
memoryManager,
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
index d594a7c..0b8b5cc 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
@@ -20,8 +20,6 @@ package org.apache.flink.table.runtime.util;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.state.KeyedStateBackend;
/** Utility to create a {@link StateTtlConfig} object. */
public class StateConfigUtil {
@@ -40,9 +38,4 @@ public class StateConfigUtil {
return StateTtlConfig.DISABLED;
}
}
-
- public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?>
keyedStateBackend) {
- // TODO: remove the hard code check once FLINK-21027 is supported
- return
keyedStateBackend.isStateImmutableInStateBackend(CheckpointType.CHECKPOINT);
- }
}