This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 076fa80e3147d2d19f2f54af6b85fd63038fb6a0 Author: fredia <fredia...@gmail.com> AuthorDate: Thu Apr 3 12:32:20 2025 +0800 [FLINK-37158][tests] Introduce ForSt to existing ITCases This reverts commit 0d0c185c7c29bef472c3755a4e395748dec5eaec. --- .../snapshot/ForStNativeFullSnapshotStrategy.java | 22 ++-- .../forst/sync/ForStSyncKeyedStateBackend.java | 3 +- .../test/checkpointing/AutoRescalingITCase.java | 4 +- .../EventTimeWindowCheckpointingITCase.java | 34 ++++++ .../KeyedStateCheckpointingITCase.java | 18 ++++ .../RescaleCheckpointManuallyITCase.java | 118 +++++++++++++++++++-- 6 files changed, 180 insertions(+), 19 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java index a23293568f7..e4ba427f389 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -153,14 +152,13 @@ public class ForStNativeFullSnapshotStrategy<K> final PreviousSnapshot previousSnapshot = snapshotMetaData(checkpointId, stateMetaInfoSnapshots); - try (ResourceGuard.Lease ignoredLease = resourceGuard.acquireResource()) { - - // Disable file deletion for file transformation. ForSt will decide whether to allow - // file - // deletion based on the number of calls to disableFileDeletions() and - // enableFileDeletions(), so disableFileDeletions() should be call only once. - db.disableFileDeletions(); - + ResourceGuard.Lease lease = resourceGuard.acquireResource(); + // Disable file deletion for file transformation. ForSt will decide whether to allow + // file + // deletion based on the number of calls to disableFileDeletions() and + // enableFileDeletions(), so disableFileDeletions() should be call only once. + db.disableFileDeletions(); + try { // get live files with flush memtable RocksDB.LiveFiles liveFiles = db.getLiveFiles(true); List<Path> liveFilesPath = @@ -186,13 +184,14 @@ public class ForStNativeFullSnapshotStrategy<K> manifestFile, previousSnapshot, () -> { - try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { + try { db.enableFileDeletions(false); + lease.close(); LOG.info( "Release one file deletion lock with ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.", backendUID, checkpointId); - } catch (RocksDBException | IOException e) { + } catch (RocksDBException e) { LOG.error( "Enable file deletion failed, backendUID:{}, checkpointId:{}.", backendUID, @@ -206,6 +205,7 @@ public class ForStNativeFullSnapshotStrategy<K> backendUID, checkpointId); db.enableFileDeletions(false); + lease.close(); throw e; } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index 63bdb9eda78..da26ae0d937 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -608,7 +608,8 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> @Nonnull @Override public SavepointResources<K> savepoint() throws Exception { - throw new UnsupportedOperationException("This method is not supported."); + throw new UnsupportedOperationException( + "Canonical savepoints are not supported by ForSt State Backend."); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 374e53963c9..42eb9992f89 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -116,7 +116,9 @@ public class AutoRescalingITCase extends TestLogger { new Object[][] { {"rocksdb", false}, {"rocksdb", true}, - {"hashmap", false} + {"hashmap", false}, + {"forst", false}, + {"forst", true} }); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index d3369ff7598..ca07e3a6ab2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -35,6 +35,8 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.state.forst.ForStOptions; +import org.apache.flink.state.forst.ForStStateBackend; import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend; import org.apache.flink.state.rocksdb.RocksDBOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -110,6 +112,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { ROCKSDB_FULL, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, + FORST_INCREMENTAL } @Parameterized.Parameters(name = "statebackend type ={0}") @@ -191,6 +194,14 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { setupRocksDB(config, 16, true); break; } + case FORST_INCREMENTAL: + { + config.set( + ForStOptions.TIMER_SERVICE_FACTORY, + ForStStateBackend.PriorityQueueStateType.ForStDB); + setupForSt(config, 16); + break; + } default: throw new IllegalStateException("No backend selected."); } @@ -229,6 +240,29 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb); } + private void setupForSt(Configuration config, int fileSizeThreshold) throws IOException { + // Configure the managed memory size as 64MB per slot for rocksDB state backend. + config.set( + TaskManagerOptions.MANAGED_MEMORY_SIZE, + MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64)); + + final String forstdb = tempFolder.newFolder().getAbsolutePath(); + final File backups = tempFolder.newFolder().getAbsoluteFile(); + // we use the fs backend with small threshold here to test the behaviour with file + // references, not self contained byte handles + config.set(StateBackendOptions.STATE_BACKEND, "forst"); + config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); + config.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + Path.fromLocalFile(backups).toUri().toString()); + if (fileSizeThreshold != -1) { + config.set( + CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, + MemorySize.parse(fileSizeThreshold + "b")); + } + config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb); + } + protected Configuration createClusterConfig() throws IOException { TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java index 150096ab43a..be9ba1eeac8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java @@ -25,10 +25,13 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.state.forst.ForStOptions; import org.apache.flink.state.rocksdb.RocksDBOptions; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -143,6 +146,21 @@ public class KeyedStateCheckpointingITCase extends TestLogger { testProgramWithBackend(env); } + @Test + public void testWithForStBackendIncremental() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.configure( + new Configuration() + .set(StateBackendOptions.STATE_BACKEND, "forst") + .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true) + .set( + ForStOptions.LOCAL_DIRECTORIES, + tmpFolder.newFolder().getAbsolutePath())); + CheckpointStorageUtils.configureFileSystemCheckpointStorage( + env, tmpFolder.newFolder().toURI().toString()); + testProgramWithBackend(env); + } + // ------------------------------------------------------------------------ protected void testProgramWithBackend(StreamExecutionEnvironment env) throws Exception { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java index a4f05362348..f811c267404 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java @@ -19,9 +19,12 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; @@ -40,8 +43,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; @@ -56,8 +61,12 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Optional; @@ -74,6 +83,7 @@ import static org.junit.Assert.assertNotNull; * NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase, * because the static fields in these classes can not be shared. */ +@RunWith(Parameterized.class) public class RescaleCheckpointManuallyITCase extends TestLogger { private static final int NUM_TASK_MANAGERS = 2; @@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Parameterized.Parameter(0) + public String statebackendType; + + @Parameterized.Parameter(1) + public boolean enableAsyncState; + + @Parameterized.Parameters(name = "statebackend type ={0}, enableAsyncState={1}") + public static Collection<Object[]> parameter() { + return Arrays.asList( + new Object[][] { + {"forst", true}, {"forst", false}, {"rocksdb", true}, {"rocksdb", false} + }); + } + @Before public void setup() throws Exception { Configuration config = new Configuration(); - config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + config.set(StateBackendOptions.STATE_BACKEND, statebackendType); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); cluster = @@ -263,7 +287,7 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { SharedReference<JobID> jobID = sharedObjects.add(new JobID()); SharedReference<MiniCluster> miniClusterRef = sharedObjects.add(miniCluster); - DataStream<Integer> input = + KeyedStream<Integer, Integer> input = env.addSource( new NotifyingDefiniteKeySource( numberKeys, numberElements, failAfterEmission) { @@ -300,10 +324,18 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { return value; } }); - DataStream<Tuple2<Integer, Integer>> result = - input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect)); + if (enableAsyncState) { + input.enableAsyncState(); + DataStream<Tuple2<Integer, Integer>> result = + input.flatMap(new AsyncSubtaskIndexFlatMapper(numberElementsExpect)); - result.sinkTo(new CollectionSink<>()); + result.sinkTo(new CollectionSink<>()); + } else { + DataStream<Tuple2<Integer, Integer>> result = + input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect)); + + result.sinkTo(new CollectionSink<>()); + } return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get()); } @@ -349,8 +381,9 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { } else { boolean newCheckpoint = false; long waited = 0L; + running = false; // maximum wait 5min - while (!newCheckpoint && waited < 30000L) { + while (!newCheckpoint && waited < 300000L) { synchronized (ctx.getCheckpointLock()) { newCheckpoint = waitCheckpointCompleted(); } @@ -423,6 +456,79 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { } } + private static class AsyncSubtaskIndexFlatMapper + extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>> + implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + private transient org.apache.flink.api.common.state.v2.ValueState<Integer> counter; + private transient org.apache.flink.api.common.state.v2.ValueState<Integer> sum; + + private final int numberElements; + + public AsyncSubtaskIndexFlatMapper(int numberElements) { + this.numberElements = numberElements; + } + + @Override + public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out) + throws Exception { + StateFuture<Integer> counterFuture = + counter.asyncValue() + .thenCompose( + (Integer c) -> { + int updated = c == null ? 1 : c + 1; + return counter.asyncUpdate(updated) + .thenApply(nothing -> updated); + }); + StateFuture<Integer> sumFuture = + sum.asyncValue() + .thenCompose( + (Integer s) -> { + int updated = s == null ? value : s + value; + return sum.asyncUpdate(updated) + .thenApply(nothing -> updated); + }); + + counterFuture.thenCombine( + sumFuture, + (c, s) -> { + if (c == numberElements) { + out.collect( + Tuple2.of( + getRuntimeContext() + .getTaskInfo() + .getIndexOfThisSubtask(), + s)); + } + return null; + }); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // all managed, nothing to do. + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception {} + + @Override + public void open(OpenContext openContext) throws Exception { + counter = + ((StreamingRuntimeContext) getRuntimeContext()) + .getValueState( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( + "counter", BasicTypeInfo.INT_TYPE_INFO)); + sum = + ((StreamingRuntimeContext) getRuntimeContext()) + .getValueState( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( + "sum", BasicTypeInfo.INT_TYPE_INFO)); + } + } + private static class CollectionSink<IN> implements Sink<IN> { private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> writers =