This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9faac10f84f56029afbbf6e6a4459762428d2160 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Thu Jul 24 21:41:24 2025 +0200 [FLINK-38137][state] RocksDB and ForSt state backend are compatible with the case where the user value of the map state is null --- .../apache/flink/api/common/state/MapState.java | 3 + .../apache/flink/api/common/state/v2/MapState.java | 3 + .../flink/runtime/state/StateBackendTestBase.java | 93 +++++++ .../state/forst/sync/AbstractForStSyncState.java | 5 +- .../flink/state/rocksdb/AbstractRocksDBState.java | 5 +- .../MapStateNullValueCheckpointingITCase.java | 266 +++++++++++++++++++++ 6 files changed, 373 insertions(+), 2 deletions(-) diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java index 24054298fdb..739788a3241 100644 --- a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -35,6 +35,9 @@ import java.util.Map; * the current element. That way, the system can handle stream and state partitioning consistently * together. * + * <p>The user value could be null, but change log state backend is not compatible with the user + * value is null, see FLINK-38144 for more details. + * * @param <UK> Type of the keys in the state. * @param <UV> Type of the values in the state. */ diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java index 3f71e99cd7e..d698f7f9dc8 100644 --- a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java @@ -35,6 +35,9 @@ import java.util.Map; * the current element. That way, the system can handle stream and state partitioning consistently * together. * + * <p>The user value could be null, but change log state backend is not compatible with the user + * value is null, see FLINK-38144 for more details. + * * @param <UK> Type of the keys in the state. * @param <UV> Type of the values in the state. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 10e07350f9f..92468cb6880 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -36,11 +36,13 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -51,6 +53,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateLatencyTrackOptions; import org.apache.flink.configuration.StateSizeTrackOptions; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; @@ -5435,6 +5439,95 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { return jobManagerOwnedSnapshot; } + @TestTemplate + public void testMapStateWithNullValue() throws Exception { + CheckpointableKeyedStateBackend<String> keyedBackend = + createKeyedBackend(new NullUnsafeTypeSerializer()); + MapStateDescriptor<Integer, String> kvId = + new MapStateDescriptor<>( + "id", IntSerializer.INSTANCE, new NullUnsafeTypeSerializer()); + MapState<Integer, String> state = + keyedBackend.getPartitionedState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + String key = "key"; + int userKey = 1; + keyedBackend.setCurrentKey(key); + // this should not throw an exception + state.put(userKey, null); + assertThat(state.contains(userKey)).isTrue(); + assertThat(state.get(userKey)).isNull(); + } + + /** + * A test utility serializer that simulates the behavior of built-in Flink serializers which do + * not support null values. + * + * <p>Many core serializers in Flink, such as {@link + * org.apache.flink.api.common.typeutils.base.IntSerializer} and {@link + * org.apache.flink.api.common.typeutils.base.BooleanSerializer}, are designed to be null-unsafe + * and will throw a {@link NullPointerException} if they encounter a null value. + */ + private static class NullUnsafeTypeSerializer extends TypeSerializerSingleton<String> { + + private static final long serialVersionUID = 1L; + + private final StringSerializer stringSerializer = new StringSerializer(); + + @Override + public boolean isImmutableType() { + return stringSerializer.isImmutableType(); + } + + @Override + public String createInstance() { + return stringSerializer.createInstance(); + } + + @Override + public String copy(String from) { + return stringSerializer.copy(from); + } + + @Override + public String copy(String from, String reuse) { + return stringSerializer.copy(from, reuse); + } + + @Override + public int getLength() { + return stringSerializer.getLength(); + } + + @Override + public void serialize(String record, DataOutputView target) throws IOException { + if (record == null) { + throw new NullPointerException("This serializer cannot serialize nulls."); + } + stringSerializer.serialize(record, target); + } + + @Override + public String deserialize(DataInputView source) throws IOException { + return stringSerializer.deserialize(source); + } + + @Override + public String deserialize(String reuse, DataInputView source) throws IOException { + return stringSerializer.deserialize(reuse, source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + stringSerializer.copy(source, target); + } + + @Override + public TypeSerializerSnapshot<String> snapshotConfiguration() { + return stringSerializer.snapshotConfiguration(); + } + } + public static class TestPojo implements Serializable { private String strField; private Integer intField; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java index f526d8facba..6ec7aced356 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java @@ -172,7 +172,10 @@ public abstract class AbstractForStSyncState<K, N, V> implements InternalKvState throws IOException { dataOutputView.clear(); dataOutputView.writeBoolean(value == null); - return serializeValueInternal(value, serializer); + if (value != null) { + serializer.serialize(value, dataOutputView); + } + return dataOutputView.getCopyOfBuffer(); } <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java index faee84a9cdb..b60347ff208 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java @@ -176,7 +176,10 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K throws IOException { dataOutputView.clear(); dataOutputView.writeBoolean(value == null); - return serializeValueInternal(value, serializer); + if (value != null) { + serializer.serialize(value, dataOutputView); + } + return dataOutputView.getCopyOfBuffer(); } <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java new file mode 100644 index 00000000000..6200b0d518c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Either; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for checkpointing and restoring a job with a {@link MapState} that contains null + * user values. + */ +@RunWith(Parameterized.class) +public class MapStateNullValueCheckpointingITCase extends TestLogger { + + @Parameterized.Parameters(name = "stateBackend : {0}, snapshotType : {1}") + public static Collection<Object[]> data() { + return Arrays.asList( + new Object[][] { + {"rocksdb", Either.Left(CheckpointType.FULL)}, + {"rocksdb", Either.Left(CheckpointType.INCREMENTAL)}, + {"rocksdb", Either.Right(SavepointFormatType.CANONICAL)}, + {"rocksdb", Either.Right(SavepointFormatType.NATIVE)}, + {"hashmap", Either.Left(CheckpointType.FULL)}, + {"hashmap", Either.Left(CheckpointType.INCREMENTAL)}, + {"hashmap", Either.Right(SavepointFormatType.CANONICAL)}, + {"hashmap", Either.Right(SavepointFormatType.NATIVE)}, + {"forst", Either.Left(CheckpointType.FULL)}, + {"forst", Either.Left(CheckpointType.INCREMENTAL)}, + {"forst", Either.Right(SavepointFormatType.NATIVE)} + }); + } + + @Parameterized.Parameter public String stateBackend; + + @Parameterized.Parameter(1) + public Either<CheckpointType, SavepointFormatType> snapshotType; + + @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniClusterWithClientResource cluster; + + @Before + public void before() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(new Configuration()) + .setNumberTaskManagers(10) + .setNumberSlotsPerTaskManager(1) + .build()); + cluster.before(); + + StatefulMapper.firstRunFuture = new CompletableFuture<>(); + StatefulMapper.secondRunFuture = new CompletableFuture<>(); + } + + @After + public void after() { + if (cluster != null) { + cluster.after(); + cluster = null; + } + } + + @Test + public void testMapStateWithNullValueCheckpointingAndRestore() throws Exception { + final String savepointPath = runJobAndTakeSnapshot(); + assertThat(savepointPath).isNotEmpty(); + restoreAndVerify(savepointPath); + } + + private String runJobAndTakeSnapshot() throws Exception { + Configuration conf = new Configuration(); + conf.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + tmpFolder.newFolder().toURI().toString()); + conf.set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, RETAIN_ON_CANCELLATION); + conf.set( + CheckpointingOptions.SAVEPOINT_DIRECTORY, tmpFolder.newFolder().toURI().toString()); + conf.set(StateBackendOptions.STATE_BACKEND, stateBackend); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + + env.setParallelism(1); + + env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator Source") + .keyBy(v -> 0) + .map(new StatefulMapper(true)) + .sinkTo(new DiscardingSink<>()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + MiniCluster miniCluster = cluster.getMiniCluster(); + miniCluster.submitJob(jobGraph).get(); + + JobID jobID = jobGraph.getJobID(); + + // Wait for the job to be running and the state to be populated. + StatefulMapper.firstRunFuture.get(2, TimeUnit.MINUTES); + + if (snapshotType.isLeft()) { + // Trigger a checkpoint + cluster.getClusterClient() + .triggerCheckpoint(jobID, snapshotType.left()) + .get(2, TimeUnit.MINUTES); + String checkpointPath = + CommonTestUtils.getLatestCompletedCheckpointPath(jobID, miniCluster) + .<NoSuchElementException>orElseThrow( + () -> { + throw new NoSuchElementException( + "No checkpoint was created yet"); + }); + cluster.getClusterClient().cancel(jobID); + return checkpointPath; + } else { + // Trigger a savepoint + return cluster.getClusterClient() + .stopWithSavepoint(jobID, false, null, snapshotType.right()) + .get(2, TimeUnit.MINUTES); + } + } + + private static DataGeneratorSource<Long> createSource() { + return new DataGeneratorSource<>( + (GeneratorFunction<Long, Long>) value -> value, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(5), + BasicTypeInfo.LONG_TYPE_INFO); + } + + private void restoreAndVerify(String savepointPath) throws Exception { + Configuration conf = new Configuration(); + conf.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + tmpFolder.newFolder().toURI().toString()); + conf.set( + CheckpointingOptions.SAVEPOINT_DIRECTORY, tmpFolder.newFolder().toURI().toString()); + conf.set(StateBackendOptions.STATE_BACKEND, stateBackend); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + env.setParallelism(1); + + env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator Source") + .keyBy(v -> 0) + .map(new StatefulMapper(false)) + .sinkTo(new DiscardingSink<>()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + + MiniCluster miniCluster = cluster.getMiniCluster(); + miniCluster.submitJob(jobGraph).get(); + + Map<String, String> restoredState = StatefulMapper.secondRunFuture.get(2, TimeUnit.MINUTES); + + assertThat(restoredState.get("key")).isEqualTo("value"); + assertThat(restoredState.get("null-key")).isNull(); + assertThat(restoredState.containsKey("null-key")).isTrue(); + } + + private static class StatefulMapper extends RichMapFunction<Long, Long> { + + static CompletableFuture<Void> firstRunFuture; + static CompletableFuture<Map<String, String>> secondRunFuture; + + private final boolean isFirstRun; + private boolean hasPopulated; + private transient MapState<String, String> mapState; + + StatefulMapper(boolean isFirstRun) { + this.isFirstRun = isFirstRun; + } + + @Override + public void open(OpenContext context) { + MapStateDescriptor<String, String> mapStateDescriptor = + new MapStateDescriptor<>( + "map-state", + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + mapState = getRuntimeContext().getMapState(mapStateDescriptor); + + ValueStateDescriptor<Boolean> hasPopulatedStateDescriptor = + new ValueStateDescriptor<>("has-populated", BasicTypeInfo.BOOLEAN_TYPE_INFO); + hasPopulated = false; + } + + @Override + public Long map(Long value) throws Exception { + if (hasPopulated) { + return value; + } + if (isFirstRun) { + mapState.put("key", "value"); + mapState.put("null-key", null); + firstRunFuture.complete(null); + } else { + // This is the first record for this key after restore. + // Verify that the state is correctly restored. + Map<String, String> restoredState = new HashMap<>(); + restoredState.put("key", mapState.get("key")); + restoredState.put("null-key", mapState.get("null-key")); + + secondRunFuture.complete(restoredState); + } + hasPopulated = true; + return value; + } + } +}