This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 076fa80e3147d2d19f2f54af6b85fd63038fb6a0
Author: fredia <fredia...@gmail.com>
AuthorDate: Thu Apr 3 12:32:20 2025 +0800

    [FLINK-37158][tests] Introduce ForSt to existing ITCases
    
    This reverts commit 0d0c185c7c29bef472c3755a4e395748dec5eaec.
---
 .../snapshot/ForStNativeFullSnapshotStrategy.java  |  22 ++--
 .../forst/sync/ForStSyncKeyedStateBackend.java     |   3 +-
 .../test/checkpointing/AutoRescalingITCase.java    |   4 +-
 .../EventTimeWindowCheckpointingITCase.java        |  34 ++++++
 .../KeyedStateCheckpointingITCase.java             |  18 ++++
 .../RescaleCheckpointManuallyITCase.java           | 118 +++++++++++++++++++--
 6 files changed, 180 insertions(+), 19 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
index a23293568f7..e4ba427f389 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -153,14 +152,13 @@ public class ForStNativeFullSnapshotStrategy<K>
         final PreviousSnapshot previousSnapshot =
                 snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
 
-        try (ResourceGuard.Lease ignoredLease = 
resourceGuard.acquireResource()) {
-
-            // Disable file deletion for file transformation. ForSt will 
decide whether to allow
-            // file
-            // deletion based on the number of calls to disableFileDeletions() 
and
-            // enableFileDeletions(), so disableFileDeletions() should be call 
only once.
-            db.disableFileDeletions();
-
+        ResourceGuard.Lease lease = resourceGuard.acquireResource();
+        // Disable file deletion for file transformation. ForSt will decide 
whether to allow
+        // file
+        // deletion based on the number of calls to disableFileDeletions() and
+        // enableFileDeletions(), so disableFileDeletions() should be call 
only once.
+        db.disableFileDeletions();
+        try {
             // get live files with flush memtable
             RocksDB.LiveFiles liveFiles = db.getLiveFiles(true);
             List<Path> liveFilesPath =
@@ -186,13 +184,14 @@ public class ForStNativeFullSnapshotStrategy<K>
                     manifestFile,
                     previousSnapshot,
                     () -> {
-                        try (ResourceGuard.Lease lease = 
resourceGuard.acquireResource()) {
+                        try {
                             db.enableFileDeletions(false);
+                            lease.close();
                             LOG.info(
                                     "Release one file deletion lock with 
ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.",
                                     backendUID,
                                     checkpointId);
-                        } catch (RocksDBException | IOException e) {
+                        } catch (RocksDBException e) {
                             LOG.error(
                                     "Enable file deletion failed, 
backendUID:{}, checkpointId:{}.",
                                     backendUID,
@@ -206,6 +205,7 @@ public class ForStNativeFullSnapshotStrategy<K>
                     backendUID,
                     checkpointId);
             db.enableFileDeletions(false);
+            lease.close();
             throw e;
         }
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 63bdb9eda78..da26ae0d937 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -608,7 +608,8 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     @Nonnull
     @Override
     public SavepointResources<K> savepoint() throws Exception {
-        throw new UnsupportedOperationException("This method is not 
supported.");
+        throw new UnsupportedOperationException(
+                "Canonical savepoints are not supported by ForSt State 
Backend.");
     }
 
     @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 374e53963c9..42eb9992f89 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -116,7 +116,9 @@ public class AutoRescalingITCase extends TestLogger {
                 new Object[][] {
                     {"rocksdb", false},
                     {"rocksdb", true},
-                    {"hashmap", false}
+                    {"hashmap", false},
+                    {"forst", false},
+                    {"forst", true}
                 });
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index d3369ff7598..ca07e3a6ab2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -35,6 +35,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.state.forst.ForStOptions;
+import org.apache.flink.state.forst.ForStStateBackend;
 import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
 import org.apache.flink.state.rocksdb.RocksDBOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -110,6 +112,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
         ROCKSDB_FULL,
         ROCKSDB_INCREMENTAL,
         ROCKSDB_INCREMENTAL_ZK,
+        FORST_INCREMENTAL
     }
 
     @Parameterized.Parameters(name = "statebackend type ={0}")
@@ -191,6 +194,14 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                     setupRocksDB(config, 16, true);
                     break;
                 }
+            case FORST_INCREMENTAL:
+                {
+                    config.set(
+                            ForStOptions.TIMER_SERVICE_FACTORY,
+                            ForStStateBackend.PriorityQueueStateType.ForStDB);
+                    setupForSt(config, 16);
+                    break;
+                }
             default:
                 throw new IllegalStateException("No backend selected.");
         }
@@ -229,6 +240,29 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
         config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
     }
 
+    private void setupForSt(Configuration config, int fileSizeThreshold) 
throws IOException {
+        // Configure the managed memory size as 64MB per slot for rocksDB 
state backend.
+        config.set(
+                TaskManagerOptions.MANAGED_MEMORY_SIZE,
+                MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 
64));
+
+        final String forstdb = tempFolder.newFolder().getAbsolutePath();
+        final File backups = tempFolder.newFolder().getAbsoluteFile();
+        // we use the fs backend with small threshold here to test the 
behaviour with file
+        // references, not self contained byte handles
+        config.set(StateBackendOptions.STATE_BACKEND, "forst");
+        config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+        config.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                Path.fromLocalFile(backups).toUri().toString());
+        if (fileSizeThreshold != -1) {
+            config.set(
+                    CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
+                    MemorySize.parse(fileSizeThreshold + "b"));
+        }
+        config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
+    }
+
     protected Configuration createClusterConfig() throws IOException {
         TemporaryFolder temporaryFolder = new TemporaryFolder();
         temporaryFolder.create();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index 150096ab43a..be9ba1eeac8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -25,10 +25,13 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.state.forst.ForStOptions;
 import org.apache.flink.state.rocksdb.RocksDBOptions;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -143,6 +146,21 @@ public class KeyedStateCheckpointingITCase extends 
TestLogger {
         testProgramWithBackend(env);
     }
 
+    @Test
+    public void testWithForStBackendIncremental() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.configure(
+                new Configuration()
+                        .set(StateBackendOptions.STATE_BACKEND, "forst")
+                        .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
true)
+                        .set(
+                                ForStOptions.LOCAL_DIRECTORIES,
+                                tmpFolder.newFolder().getAbsolutePath()));
+        CheckpointStorageUtils.configureFileSystemCheckpointStorage(
+                env, tmpFolder.newFolder().toURI().toString());
+        testProgramWithBackend(env);
+    }
+
     // ------------------------------------------------------------------------
 
     protected void testProgramWithBackend(StreamExecutionEnvironment env) 
throws Exception {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index a4f05362348..f811c267404 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -19,9 +19,12 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
@@ -40,8 +43,10 @@ import 
org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
@@ -56,8 +61,12 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -74,6 +83,7 @@ import static org.junit.Assert.assertNotNull;
  * NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer 
to RescalingITCase,
  * because the static fields in these classes can not be shared.
  */
+@RunWith(Parameterized.class)
 public class RescaleCheckpointManuallyITCase extends TestLogger {
 
     private static final int NUM_TASK_MANAGERS = 2;
@@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
 
     @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
 
+    @Parameterized.Parameter(0)
+    public String statebackendType;
+
+    @Parameterized.Parameter(1)
+    public boolean enableAsyncState;
+
+    @Parameterized.Parameters(name = "statebackend type ={0}, 
enableAsyncState={1}")
+    public static Collection<Object[]> parameter() {
+        return Arrays.asList(
+                new Object[][] {
+                    {"forst", true}, {"forst", false}, {"rocksdb", true}, 
{"rocksdb", false}
+                });
+    }
+
     @Before
     public void setup() throws Exception {
         Configuration config = new Configuration();
-        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
         config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
 
         cluster =
@@ -263,7 +287,7 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
 
         SharedReference<JobID> jobID = sharedObjects.add(new JobID());
         SharedReference<MiniCluster> miniClusterRef = 
sharedObjects.add(miniCluster);
-        DataStream<Integer> input =
+        KeyedStream<Integer, Integer> input =
                 env.addSource(
                                 new NotifyingDefiniteKeySource(
                                         numberKeys, numberElements, 
failAfterEmission) {
@@ -300,10 +324,18 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                                         return value;
                                     }
                                 });
-        DataStream<Tuple2<Integer, Integer>> result =
-                input.flatMap(new 
SubtaskIndexFlatMapper(numberElementsExpect));
+        if (enableAsyncState) {
+            input.enableAsyncState();
+            DataStream<Tuple2<Integer, Integer>> result =
+                    input.flatMap(new 
AsyncSubtaskIndexFlatMapper(numberElementsExpect));
 
-        result.sinkTo(new CollectionSink<>());
+            result.sinkTo(new CollectionSink<>());
+        } else {
+            DataStream<Tuple2<Integer, Integer>> result =
+                    input.flatMap(new 
SubtaskIndexFlatMapper(numberElementsExpect));
+
+            result.sinkTo(new CollectionSink<>());
+        }
 
         return 
env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
     }
@@ -349,8 +381,9 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
                 } else {
                     boolean newCheckpoint = false;
                     long waited = 0L;
+                    running = false;
                     // maximum wait 5min
-                    while (!newCheckpoint && waited < 30000L) {
+                    while (!newCheckpoint && waited < 300000L) {
                         synchronized (ctx.getCheckpointLock()) {
                             newCheckpoint = waitCheckpointCompleted();
                         }
@@ -423,6 +456,79 @@ public class RescaleCheckpointManuallyITCase extends 
TestLogger {
         }
     }
 
+    private static class AsyncSubtaskIndexFlatMapper
+            extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
+            implements CheckpointedFunction {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient 
org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
+        private transient 
org.apache.flink.api.common.state.v2.ValueState<Integer> sum;
+
+        private final int numberElements;
+
+        public AsyncSubtaskIndexFlatMapper(int numberElements) {
+            this.numberElements = numberElements;
+        }
+
+        @Override
+        public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> 
out)
+                throws Exception {
+            StateFuture<Integer> counterFuture =
+                    counter.asyncValue()
+                            .thenCompose(
+                                    (Integer c) -> {
+                                        int updated = c == null ? 1 : c + 1;
+                                        return counter.asyncUpdate(updated)
+                                                .thenApply(nothing -> updated);
+                                    });
+            StateFuture<Integer> sumFuture =
+                    sum.asyncValue()
+                            .thenCompose(
+                                    (Integer s) -> {
+                                        int updated = s == null ? value : s + 
value;
+                                        return sum.asyncUpdate(updated)
+                                                .thenApply(nothing -> updated);
+                                    });
+
+            counterFuture.thenCombine(
+                    sumFuture,
+                    (c, s) -> {
+                        if (c == numberElements) {
+                            out.collect(
+                                    Tuple2.of(
+                                            getRuntimeContext()
+                                                    .getTaskInfo()
+                                                    .getIndexOfThisSubtask(),
+                                            s));
+                        }
+                        return null;
+                    });
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            // all managed, nothing to do.
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {}
+
+        @Override
+        public void open(OpenContext openContext) throws Exception {
+            counter =
+                    ((StreamingRuntimeContext) getRuntimeContext())
+                            .getValueState(
+                                    new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
+                                            "counter", 
BasicTypeInfo.INT_TYPE_INFO));
+            sum =
+                    ((StreamingRuntimeContext) getRuntimeContext())
+                            .getValueState(
+                                    new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
+                                            "sum", 
BasicTypeInfo.INT_TYPE_INFO));
+        }
+    }
+
     private static class CollectionSink<IN> implements Sink<IN> {
 
         private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> 
writers =

Reply via email to