This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 795fcbe79a7 [FLINK-37554][state/forst] Add UT for state compatibility between ForStKeyeStateBackend and ForStSyncKeyeStateBackend (#26440) 795fcbe79a7 is described below commit 795fcbe79a76effd9fb310671bf868c27161341c Author: mayuehappy <mayue.fi...@bytedance.com> AuthorDate: Mon Apr 14 19:45:40 2025 +0800 [FLINK-37554][state/forst] Add UT for state compatibility between ForStKeyeStateBackend and ForStSyncKeyeStateBackend (#26440) --- .../forst/ForStAsyncAndSyncCompatibilityTest.java | 232 +++++++++++++++++++++ .../flink/state/forst/ForStStateMigrationTest.java | 2 +- .../apache/flink/state/forst/ForStTestUtils.java | 26 +++ 3 files changed, 259 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java new file mode 100644 index 00000000000..749f4ef621c --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStAsyncAndSyncCompatibilityTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.v2.StateDescriptorUtils; +import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; +import org.apache.flink.util.IOUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.state.forst.ForStStateTestBase.getMockEnvironment; +import static org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBackend; +import static org.apache.flink.state.forst.ForStTestUtils.createSyncKeyedStateBackend; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** Compatibility test for {@link ForStKeyedStateBackend} and {@link ForStSyncKeyedStateBackend}. */ +class ForStAsyncAndSyncCompatibilityTest { + protected ForStStateBackend forStStateBackend; + protected AsyncExecutionController<String> aec; + protected MailboxExecutor mailboxExecutor; + + protected RecordContext<String> context; + + protected MockEnvironment env; + + @BeforeEach + public void setup(@TempDir File temporaryFolder) throws IOException { + FileSystem.initialize(new Configuration(), null); + Configuration configuration = new Configuration(); + configuration.set(ForStOptions.PRIMARY_DIRECTORY, temporaryFolder.toURI().toString()); + forStStateBackend = new ForStStateBackend().configure(configuration, null); + + env = getMockEnvironment(temporaryFolder); + + mailboxExecutor = + new MailboxExecutorImpl( + new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE); + } + + @Test + void testForStTransFromAsyncToSync() throws Exception { + ForStKeyedStateBackend<String> keyedBackend = + setUpAsyncKeyedStateBackend(Collections.emptyList()); + MapStateDescriptor<Integer, String> descriptor = + new MapStateDescriptor<>( + "testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); + + MapState<Integer, String> asyncMapState = + keyedBackend.createState(1, IntSerializer.INSTANCE, descriptor); + + context = aec.buildContext("testRecord", "testKey"); + context.retain(); + aec.setCurrentContext(context); + asyncMapState.asyncPut(1, "1"); + context.release(); + aec.drainInflightRecords(0); + + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = + keyedBackend.snapshot( + 1L, + System.currentTimeMillis(), + env.getCheckpointStorageAccess() + .resolveCheckpointStorageLocation( + 1L, CheckpointStorageLocationReference.getDefault()), + CheckpointOptions.forCheckpointWithDefaultLocation()); + + if (!snapshot.isDone()) { + snapshot.run(); + } + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + KeyedStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + IOUtils.closeQuietly(keyedBackend); + ForStSyncKeyedStateBackend<String> syncKeyedStateBackend = + createSyncKeyedStateBackend( + forStStateBackend, + env, + StringSerializer.INSTANCE, + Collections.singletonList(stateHandle)); + + try { + org.apache.flink.api.common.state.MapState<Integer, String> syncMapState = + syncKeyedStateBackend.getOrCreateKeyedState( + IntSerializer.INSTANCE, + StateDescriptorUtils.transformFromV2ToV1(descriptor)); + fail(); + + syncKeyedStateBackend.setCurrentKey("testKey"); + ((InternalKvState) syncKeyedStateBackend).setCurrentNamespace(1); + assertThat(syncMapState.get(1)).isEqualTo("1"); + } catch (Exception e) { + // Currently, ForStStateBackend does not support switching from Async to Sync, so this + // exception will be caught here + assertThat(e).isInstanceOf(ClassCastException.class); + assertThat(e.getMessage()) + .contains( + "org.apache.flink.runtime.state.v2.RegisteredKeyAndUserKeyValueStateBackendMetaInfo cannot be cast to class org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo"); + + } finally { + IOUtils.closeQuietly(syncKeyedStateBackend); + } + } + + @Test + void testForStTransFromSyncToAsync() throws Exception { + ForStSyncKeyedStateBackend<String> keyedBackend = + createSyncKeyedStateBackend( + forStStateBackend, env, StringSerializer.INSTANCE, Collections.emptyList()); + org.apache.flink.api.common.state.MapStateDescriptor<Integer, String> descriptor = + new org.apache.flink.api.common.state.MapStateDescriptor<>( + "testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); + org.apache.flink.api.common.state.MapState<Integer, String> mapState = + keyedBackend.getOrCreateKeyedState(IntSerializer.INSTANCE, descriptor); + keyedBackend.setCurrentKey("testKey"); + ((InternalKvState) mapState).setCurrentNamespace(1); + mapState.put(1, "1"); + + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = + keyedBackend.snapshot( + 1L, + System.currentTimeMillis(), + env.getCheckpointStorageAccess() + .resolveCheckpointStorageLocation( + 1L, CheckpointStorageLocationReference.getDefault()), + CheckpointOptions.forCheckpointWithDefaultLocation()); + + if (!snapshot.isDone()) { + snapshot.run(); + } + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + KeyedStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + IOUtils.closeQuietly(keyedBackend); + + ForStKeyedStateBackend<String> asyncKeyedStateBackend = + setUpAsyncKeyedStateBackend(Collections.singletonList(stateHandle)); + + MapStateDescriptor<Integer, String> newStateDescriptor = + new MapStateDescriptor<>( + "testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); + try { + MapState<Integer, String> asyncMapState = + asyncKeyedStateBackend.createState( + 1, IntSerializer.INSTANCE, newStateDescriptor); + fail(); + + context = aec.buildContext("testRecord", "testKey"); + context.retain(); + aec.setCurrentContext(context); + asyncMapState + .asyncGet(1) + .thenAccept( + value -> { + assertThat(value).isEqualTo("1"); + }); + context.release(); + aec.drainInflightRecords(0); + } catch (Exception e) { + // Currently, ForStStateBackend does not support switching from Sync to Async, so this + // exception will be caught here + assertThat(e).isInstanceOf(ClassCastException.class); + assertThat(e.getMessage()) + .contains( + "org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo cannot be cast to class org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo"); + } finally { + IOUtils.closeQuietly(asyncKeyedStateBackend); + } + } + + private ForStKeyedStateBackend setUpAsyncKeyedStateBackend( + Collection<KeyedStateHandle> stateHandles) throws IOException { + ForStKeyedStateBackend<String> keyedStateBackend = + createKeyedStateBackend( + forStStateBackend, env, StringSerializer.INSTANCE, stateHandles); + aec = + new AsyncExecutionController<>( + mailboxExecutor, + (a, b) -> {}, + keyedStateBackend.createStateExecutor(), + new DeclarationManager(), + 1, + 100, + 0, + 1, + null, + null); + keyedStateBackend.setup(aec); + return keyedStateBackend; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java index 32841170a12..aef6116a804 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java @@ -51,7 +51,7 @@ import static org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBacken import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; -/** Tests for {@link ForStListState}. */ +/** Tests for the State Migration of {@link ForStKeyedStateBackend}. */ public class ForStStateMigrationTest extends ForStStateTestBase { @Test diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java index 62cbca93015..89e31baee08 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend; import java.io.IOException; import java.util.Collection; @@ -58,6 +59,31 @@ public final class ForStTestUtils { 1.0)); } + public static <K> ForStSyncKeyedStateBackend<K> createSyncKeyedStateBackend( + ForStStateBackend forStStateBackend, + Environment env, + TypeSerializer<K> keySerializer, + Collection<KeyedStateHandle> stateHandles) + throws IOException { + + return (ForStSyncKeyedStateBackend<K>) + forStStateBackend.createKeyedStateBackend( + new KeyedStateBackendParametersImpl<>( + env, + env.getJobID(), + "test_op", + keySerializer, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + (name, value) -> {}, + stateHandles, + new CloseableRegistry(), + 1.0)); + } + public static <K> ForStKeyedStateBackend<K> createKeyedStateBackend( ForStStateBackend forStStateBackend, Environment env, TypeSerializer<K> keySerializer) throws IOException {