This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9faac10f84f56029afbbf6e6a4459762428d2160
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Thu Jul 24 21:41:24 2025 +0200

    [FLINK-38137][state] RocksDB and ForSt state backend are compatible with 
the case where the user value of the map state is null
---
 .../apache/flink/api/common/state/MapState.java    |   3 +
 .../apache/flink/api/common/state/v2/MapState.java |   3 +
 .../flink/runtime/state/StateBackendTestBase.java  |  93 +++++++
 .../state/forst/sync/AbstractForStSyncState.java   |   5 +-
 .../flink/state/rocksdb/AbstractRocksDBState.java  |   5 +-
 .../MapStateNullValueCheckpointingITCase.java      | 266 +++++++++++++++++++++
 6 files changed, 373 insertions(+), 2 deletions(-)

diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java
index 24054298fdb..739788a3241 100644
--- 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -35,6 +35,9 @@ import java.util.Map;
  * the current element. That way, the system can handle stream and state 
partitioning consistently
  * together.
  *
+ * <p>The user value could be null, but change log state backend is not 
compatible with the user
+ * value is null, see FLINK-38144 for more details.
+ *
  * @param <UK> Type of the keys in the state.
  * @param <UV> Type of the values in the state.
  */
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java
index 3f71e99cd7e..d698f7f9dc8 100644
--- 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java
@@ -35,6 +35,9 @@ import java.util.Map;
  * the current element. That way, the system can handle stream and state 
partitioning consistently
  * together.
  *
+ * <p>The user value could be null, but change log state backend is not 
compatible with the user
+ * value is null, see FLINK-38144 for more details.
+ *
  * @param <UK> Type of the keys in the state.
  * @param <UV> Type of the values in the state.
  */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 10e07350f9f..92468cb6880 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -36,11 +36,13 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -51,6 +53,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateLatencyTrackOptions;
 import org.apache.flink.configuration.StateSizeTrackOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -5435,6 +5439,95 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
         return jobManagerOwnedSnapshot;
     }
 
+    @TestTemplate
+    public void testMapStateWithNullValue() throws Exception {
+        CheckpointableKeyedStateBackend<String> keyedBackend =
+                createKeyedBackend(new NullUnsafeTypeSerializer());
+        MapStateDescriptor<Integer, String> kvId =
+                new MapStateDescriptor<>(
+                        "id", IntSerializer.INSTANCE, new 
NullUnsafeTypeSerializer());
+        MapState<Integer, String> state =
+                keyedBackend.getPartitionedState(
+                        VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+        String key = "key";
+        int userKey = 1;
+        keyedBackend.setCurrentKey(key);
+        // this should not throw an exception
+        state.put(userKey, null);
+        assertThat(state.contains(userKey)).isTrue();
+        assertThat(state.get(userKey)).isNull();
+    }
+
+    /**
+     * A test utility serializer that simulates the behavior of built-in Flink 
serializers which do
+     * not support null values.
+     *
+     * <p>Many core serializers in Flink, such as {@link
+     * org.apache.flink.api.common.typeutils.base.IntSerializer} and {@link
+     * org.apache.flink.api.common.typeutils.base.BooleanSerializer}, are 
designed to be null-unsafe
+     * and will throw a {@link NullPointerException} if they encounter a null 
value.
+     */
+    private static class NullUnsafeTypeSerializer extends 
TypeSerializerSingleton<String> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final StringSerializer stringSerializer = new 
StringSerializer();
+
+        @Override
+        public boolean isImmutableType() {
+            return stringSerializer.isImmutableType();
+        }
+
+        @Override
+        public String createInstance() {
+            return stringSerializer.createInstance();
+        }
+
+        @Override
+        public String copy(String from) {
+            return stringSerializer.copy(from);
+        }
+
+        @Override
+        public String copy(String from, String reuse) {
+            return stringSerializer.copy(from, reuse);
+        }
+
+        @Override
+        public int getLength() {
+            return stringSerializer.getLength();
+        }
+
+        @Override
+        public void serialize(String record, DataOutputView target) throws 
IOException {
+            if (record == null) {
+                throw new NullPointerException("This serializer cannot 
serialize nulls.");
+            }
+            stringSerializer.serialize(record, target);
+        }
+
+        @Override
+        public String deserialize(DataInputView source) throws IOException {
+            return stringSerializer.deserialize(source);
+        }
+
+        @Override
+        public String deserialize(String reuse, DataInputView source) throws 
IOException {
+            return stringSerializer.deserialize(reuse, source);
+        }
+
+        @Override
+        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+            stringSerializer.copy(source, target);
+        }
+
+        @Override
+        public TypeSerializerSnapshot<String> snapshotConfiguration() {
+            return stringSerializer.snapshotConfiguration();
+        }
+    }
+
     public static class TestPojo implements Serializable {
         private String strField;
         private Integer intField;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java
index f526d8facba..6ec7aced356 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/AbstractForStSyncState.java
@@ -172,7 +172,10 @@ public abstract class AbstractForStSyncState<K, N, V> 
implements InternalKvState
             throws IOException {
         dataOutputView.clear();
         dataOutputView.writeBoolean(value == null);
-        return serializeValueInternal(value, serializer);
+        if (value != null) {
+            serializer.serialize(value, dataOutputView);
+        }
+        return dataOutputView.getCopyOfBuffer();
     }
 
     <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws 
IOException {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
index faee84a9cdb..b60347ff208 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java
@@ -176,7 +176,10 @@ public abstract class AbstractRocksDBState<K, N, V> 
implements InternalKvState<K
             throws IOException {
         dataOutputView.clear();
         dataOutputView.writeBoolean(value == null);
-        return serializeValueInternal(value, serializer);
+        if (value != null) {
+            serializer.serialize(value, dataOutputView);
+        }
+        return dataOutputView.getCopyOfBuffer();
     }
 
     <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws 
IOException {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java
new file mode 100644
index 00000000000..6200b0d518c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MapStateNullValueCheckpointingITCase.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for checkpointing and restoring a job with a {@link 
MapState} that contains null
+ * user values.
+ */
+@RunWith(Parameterized.class)
+public class MapStateNullValueCheckpointingITCase extends TestLogger {
+
+    @Parameterized.Parameters(name = "stateBackend : {0}, snapshotType : {1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                    {"rocksdb", Either.Left(CheckpointType.FULL)},
+                    {"rocksdb", Either.Left(CheckpointType.INCREMENTAL)},
+                    {"rocksdb", Either.Right(SavepointFormatType.CANONICAL)},
+                    {"rocksdb", Either.Right(SavepointFormatType.NATIVE)},
+                    {"hashmap", Either.Left(CheckpointType.FULL)},
+                    {"hashmap", Either.Left(CheckpointType.INCREMENTAL)},
+                    {"hashmap", Either.Right(SavepointFormatType.CANONICAL)},
+                    {"hashmap", Either.Right(SavepointFormatType.NATIVE)},
+                    {"forst", Either.Left(CheckpointType.FULL)},
+                    {"forst", Either.Left(CheckpointType.INCREMENTAL)},
+                    {"forst", Either.Right(SavepointFormatType.NATIVE)}
+                });
+    }
+
+    @Parameterized.Parameter public String stateBackend;
+
+    @Parameterized.Parameter(1)
+    public Either<CheckpointType, SavepointFormatType> snapshotType;
+
+    @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+    private static MiniClusterWithClientResource cluster;
+
+    @Before
+    public void before() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(new Configuration())
+                                .setNumberTaskManagers(10)
+                                .setNumberSlotsPerTaskManager(1)
+                                .build());
+        cluster.before();
+
+        StatefulMapper.firstRunFuture = new CompletableFuture<>();
+        StatefulMapper.secondRunFuture = new CompletableFuture<>();
+    }
+
+    @After
+    public void after() {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+    }
+
+    @Test
+    public void testMapStateWithNullValueCheckpointingAndRestore() throws 
Exception {
+        final String savepointPath = runJobAndTakeSnapshot();
+        assertThat(savepointPath).isNotEmpty();
+        restoreAndVerify(savepointPath);
+    }
+
+    private String runJobAndTakeSnapshot() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                tmpFolder.newFolder().toURI().toString());
+        conf.set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, 
RETAIN_ON_CANCELLATION);
+        conf.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, 
tmpFolder.newFolder().toURI().toString());
+        conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+
+        env.setParallelism(1);
+
+        env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data 
Generator Source")
+                .keyBy(v -> 0)
+                .map(new StatefulMapper(true))
+                .sinkTo(new DiscardingSink<>());
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        miniCluster.submitJob(jobGraph).get();
+
+        JobID jobID = jobGraph.getJobID();
+
+        // Wait for the job to be running and the state to be populated.
+        StatefulMapper.firstRunFuture.get(2, TimeUnit.MINUTES);
+
+        if (snapshotType.isLeft()) {
+            // Trigger a checkpoint
+            cluster.getClusterClient()
+                    .triggerCheckpoint(jobID, snapshotType.left())
+                    .get(2, TimeUnit.MINUTES);
+            String checkpointPath =
+                    CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
miniCluster)
+                            .<NoSuchElementException>orElseThrow(
+                                    () -> {
+                                        throw new NoSuchElementException(
+                                                "No checkpoint was created 
yet");
+                                    });
+            cluster.getClusterClient().cancel(jobID);
+            return checkpointPath;
+        } else {
+            // Trigger a savepoint
+            return cluster.getClusterClient()
+                    .stopWithSavepoint(jobID, false, null, 
snapshotType.right())
+                    .get(2, TimeUnit.MINUTES);
+        }
+    }
+
+    private static DataGeneratorSource<Long> createSource() {
+        return new DataGeneratorSource<>(
+                (GeneratorFunction<Long, Long>) value -> value,
+                Long.MAX_VALUE,
+                RateLimiterStrategy.perSecond(5),
+                BasicTypeInfo.LONG_TYPE_INFO);
+    }
+
+    private void restoreAndVerify(String savepointPath) throws Exception {
+        Configuration conf = new Configuration();
+        conf.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                tmpFolder.newFolder().toURI().toString());
+        conf.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, 
tmpFolder.newFolder().toURI().toString());
+        conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        env.setParallelism(1);
+
+        env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data 
Generator Source")
+                .keyBy(v -> 0)
+                .map(new StatefulMapper(false))
+                .sinkTo(new DiscardingSink<>());
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        miniCluster.submitJob(jobGraph).get();
+
+        Map<String, String> restoredState = 
StatefulMapper.secondRunFuture.get(2, TimeUnit.MINUTES);
+
+        assertThat(restoredState.get("key")).isEqualTo("value");
+        assertThat(restoredState.get("null-key")).isNull();
+        assertThat(restoredState.containsKey("null-key")).isTrue();
+    }
+
+    private static class StatefulMapper extends RichMapFunction<Long, Long> {
+
+        static CompletableFuture<Void> firstRunFuture;
+        static CompletableFuture<Map<String, String>> secondRunFuture;
+
+        private final boolean isFirstRun;
+        private boolean hasPopulated;
+        private transient MapState<String, String> mapState;
+
+        StatefulMapper(boolean isFirstRun) {
+            this.isFirstRun = isFirstRun;
+        }
+
+        @Override
+        public void open(OpenContext context) {
+            MapStateDescriptor<String, String> mapStateDescriptor =
+                    new MapStateDescriptor<>(
+                            "map-state",
+                            BasicTypeInfo.STRING_TYPE_INFO,
+                            BasicTypeInfo.STRING_TYPE_INFO);
+            mapState = getRuntimeContext().getMapState(mapStateDescriptor);
+
+            ValueStateDescriptor<Boolean> hasPopulatedStateDescriptor =
+                    new ValueStateDescriptor<>("has-populated", 
BasicTypeInfo.BOOLEAN_TYPE_INFO);
+            hasPopulated = false;
+        }
+
+        @Override
+        public Long map(Long value) throws Exception {
+            if (hasPopulated) {
+                return value;
+            }
+            if (isFirstRun) {
+                mapState.put("key", "value");
+                mapState.put("null-key", null);
+                firstRunFuture.complete(null);
+            } else {
+                // This is the first record for this key after restore.
+                // Verify that the state is correctly restored.
+                Map<String, String> restoredState = new HashMap<>();
+                restoredState.put("key", mapState.get("key"));
+                restoredState.put("null-key", mapState.get("null-key"));
+
+                secondRunFuture.complete(restoredState);
+            }
+            hasPopulated = true;
+            return value;
+        }
+    }
+}

Reply via email to