Myasuka commented on a change in pull request #16969:
URL: https://github.com/apache/flink/pull/16969#discussion_r702391757
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.
+ */
+public class RocksIncrementalSnapshotStrategyTest {
+
+ @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testCheckpointIsIncremental() throws Exception {
+ CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+ // prepare components
+ Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory>
prepareData =
+ prepareCreateSnapshotStrategy(closeableRegistry);
+ RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
prepareData.f0;
+ FsCheckpointStreamFactory checkpointStreamFactory = prepareData.f1;
+
+ // make and notify checkpoint with id 1
+ snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory,
closeableRegistry);
+ checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+ // notify savepoint with id 2
+ checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+ // make checkpoint with id 3
+ IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
+ snapshot(
+ 3L, checkpointSnapshotStrategy,
checkpointStreamFactory, closeableRegistry);
+
+ // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd
checkpoint is
+ // incremental.
+ Map<StateHandleID, StreamStateHandle> sharedState3 =
+ incrementalRemoteKeyedStateHandle3.getSharedState();
+ long placeholderStateHandleCount =
+ sharedState3.entrySet().stream()
+ .filter(e -> e.getValue() instanceof
PlaceholderStreamStateHandle)
+ .count();
+
+ Assert.assertTrue(placeholderStateHandleCount > 0);
+
+ checkpointSnapshotStrategy.close();
+ closeableRegistry.close();
+ }
+
+ public Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory>
+ prepareCreateSnapshotStrategy(CloseableRegistry closeableRegistry)
+ throws IOException, RocksDBException {
+
+ // prepare a rocksDB and put data
+ RocksDB rocksDB = RocksDB.open(tmp.newFolder().getAbsolutePath());
+ ColumnFamilyHandle columnFamilyHandle =
+ rocksDB.createColumnFamily(new
ColumnFamilyDescriptor("test".getBytes()));
+ byte[] key = "checkpoint".getBytes();
+ byte[] val = "incrementalTest".getBytes();
+ rocksDB.put(columnFamilyHandle, key, val);
+
+ // construct RocksIncrementalSnapshotStrategy
+ long lastCompletedCheckpointId = -1L;
+ ResourceGuard rocksDBResourceGuard = new ResourceGuard();
+ SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new
TreeMap<>();
+ LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>
kvStateInformation =
+ new LinkedHashMap<>();
+
+ RocksDBStateUploader rocksDBStateUploader =
+ spy(
+ new RocksDBStateUploader(
+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
+
+ int keyGroupPrefixBytes =
+
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);
+
+ RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>>
metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ StateDescriptor.Type.VALUE,
+ "test",
+ IntSerializer.INSTANCE,
+ new ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+ RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo =
+ new
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
+ kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo);
+
+ RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
+ new RocksIncrementalSnapshotStrategy(
+ rocksDB,
+ rocksDBResourceGuard,
+ IntSerializer.INSTANCE,
+ kvStateInformation,
+ new KeyGroupRange(0, 1),
+ keyGroupPrefixBytes,
+ TestLocalRecoveryConfig.disabled(),
+ closeableRegistry,
+ tmp.newFolder(),
+ UUID.randomUUID(),
+ materializedSstFiles,
+ rocksDBStateUploader,
+ lastCompletedCheckpointId);
+
+ // construct FsCheckpointStreamFactory
+ int threshold = 100;
+ File checkpointsDir = tmp.newFolder("checkpointsDir");
+ File sharedStateDir = tmp.newFolder("sharedStateDir");
+ FsCheckpointStreamFactory checkpointStreamFactory =
+ new FsCheckpointStreamFactory(
+ getSharedInstance(),
+ fromLocalFile(checkpointsDir),
+ fromLocalFile(sharedStateDir),
+ threshold,
+ threshold);
+
+ // register RocksDB Closeable
+ closeableRegistry.registerCloseable(() -> rocksDB.close());
Review comment:
`closeableRegistry` is mainly used for registerting and closing output
streams. It's not recomeneded to register RocksObjects.
I suggest you to leverage
`org.apache.flink.contrib.streaming.state.RocksDBResource` which ensures to
relase native object after tests.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.
Review comment:
The doc could be simpilfied to `Tests for {@link
RocksIncrementalSnapshotStrategy}`. And you could move the description to the
method `testCheckpointIsIncremental` Moreover, I think the doc of
`testCheckpointIsIncremental` could be like `Verify the next checkpoint is
still incremental after a savepoint completed`.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.
+ */
+public class RocksIncrementalSnapshotStrategyTest {
+
+ @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testCheckpointIsIncremental() throws Exception {
+ CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+ // prepare components
+ Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory>
prepareData =
+ prepareCreateSnapshotStrategy(closeableRegistry);
+ RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
prepareData.f0;
+ FsCheckpointStreamFactory checkpointStreamFactory = prepareData.f1;
+
+ // make and notify checkpoint with id 1
+ snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory,
closeableRegistry);
+ checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+ // notify savepoint with id 2
+ checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+ // make checkpoint with id 3
+ IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
+ snapshot(
+ 3L, checkpointSnapshotStrategy,
checkpointStreamFactory, closeableRegistry);
+
+ // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd
checkpoint is
+ // incremental.
+ Map<StateHandleID, StreamStateHandle> sharedState3 =
+ incrementalRemoteKeyedStateHandle3.getSharedState();
+ long placeholderStateHandleCount =
+ sharedState3.entrySet().stream()
+ .filter(e -> e.getValue() instanceof
PlaceholderStreamStateHandle)
+ .count();
+
+ Assert.assertTrue(placeholderStateHandleCount > 0);
+
+ checkpointSnapshotStrategy.close();
+ closeableRegistry.close();
Review comment:
These closeables could be closed in `finally` statements to ensure no
resource leak.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]