Myasuka commented on a change in pull request #16969:
URL: https://github.com/apache/flink/pull/16969#discussion_r702391757



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.
+ */
+public class RocksIncrementalSnapshotStrategyTest {
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    @Test
+    public void testCheckpointIsIncremental() throws Exception {
+        CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+        // prepare components
+        Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory> 
prepareData =
+                prepareCreateSnapshotStrategy(closeableRegistry);
+        RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = 
prepareData.f0;
+        FsCheckpointStreamFactory checkpointStreamFactory = prepareData.f1;
+
+        // make and notify checkpoint with id 1
+        snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+        checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+        // notify savepoint with id 2
+        checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+        // make checkpoint with id 3
+        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
+                snapshot(
+                        3L, checkpointSnapshotStrategy, 
checkpointStreamFactory, closeableRegistry);
+
+        // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd 
checkpoint is
+        // incremental.
+        Map<StateHandleID, StreamStateHandle> sharedState3 =
+                incrementalRemoteKeyedStateHandle3.getSharedState();
+        long placeholderStateHandleCount =
+                sharedState3.entrySet().stream()
+                        .filter(e -> e.getValue() instanceof 
PlaceholderStreamStateHandle)
+                        .count();
+
+        Assert.assertTrue(placeholderStateHandleCount > 0);
+
+        checkpointSnapshotStrategy.close();
+        closeableRegistry.close();
+    }
+
+    public Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory>
+    prepareCreateSnapshotStrategy(CloseableRegistry closeableRegistry)
+            throws IOException, RocksDBException {
+
+        // prepare a rocksDB and put data
+        RocksDB rocksDB = RocksDB.open(tmp.newFolder().getAbsolutePath());
+        ColumnFamilyHandle columnFamilyHandle =
+                rocksDB.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+        byte[] key = "checkpoint".getBytes();
+        byte[] val = "incrementalTest".getBytes();
+        rocksDB.put(columnFamilyHandle, key, val);
+
+        // construct RocksIncrementalSnapshotStrategy
+        long lastCompletedCheckpointId = -1L;
+        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
+        SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new 
TreeMap<>();
+        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation =
+                new LinkedHashMap<>();
+
+        RocksDBStateUploader rocksDBStateUploader =
+                spy(
+                        new RocksDBStateUploader(
+                                
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
+
+        int keyGroupPrefixBytes =
+                
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);
+
+        RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        StateDescriptor.Type.VALUE,
+                        "test",
+                        IntSerializer.INSTANCE,
+                        new ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+        RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo =
+                new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
+        kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo);
+
+        RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
+                new RocksIncrementalSnapshotStrategy(
+                        rocksDB,
+                        rocksDBResourceGuard,
+                        IntSerializer.INSTANCE,
+                        kvStateInformation,
+                        new KeyGroupRange(0, 1),
+                        keyGroupPrefixBytes,
+                        TestLocalRecoveryConfig.disabled(),
+                        closeableRegistry,
+                        tmp.newFolder(),
+                        UUID.randomUUID(),
+                        materializedSstFiles,
+                        rocksDBStateUploader,
+                        lastCompletedCheckpointId);
+
+        // construct FsCheckpointStreamFactory
+        int threshold = 100;
+        File checkpointsDir = tmp.newFolder("checkpointsDir");
+        File sharedStateDir = tmp.newFolder("sharedStateDir");
+        FsCheckpointStreamFactory checkpointStreamFactory =
+                new FsCheckpointStreamFactory(
+                        getSharedInstance(),
+                        fromLocalFile(checkpointsDir),
+                        fromLocalFile(sharedStateDir),
+                        threshold,
+                        threshold);
+
+        // register RocksDB Closeable
+        closeableRegistry.registerCloseable(() -> rocksDB.close());

Review comment:
       `closeableRegistry` is mainly used for registerting and closing output 
streams. It's not recomeneded to register RocksObjects.
   
   I suggest you to leverage 
`org.apache.flink.contrib.streaming.state.RocksDBResource` which ensures to 
relase native object after tests.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.

Review comment:
       The doc could be simpilfied to `Tests for {@link 
RocksIncrementalSnapshotStrategy}`. And you could move the description to the 
method `testCheckpointIsIncremental` Moreover, I think the doc of 
`testCheckpointIsIncremental` could be like `Verify the next checkpoint is 
still incremental after a savepoint completed`.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.util.ResourceGuard;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Test for verifying the changed logic about notifyCheckpointComplete over
+ * RocksIncrementalSnapshotStrategy is incremental.
+ */
+public class RocksIncrementalSnapshotStrategyTest {
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    @Test
+    public void testCheckpointIsIncremental() throws Exception {
+        CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+        // prepare components
+        Tuple2<RocksIncrementalSnapshotStrategy, FsCheckpointStreamFactory> 
prepareData =
+                prepareCreateSnapshotStrategy(closeableRegistry);
+        RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = 
prepareData.f0;
+        FsCheckpointStreamFactory checkpointStreamFactory = prepareData.f1;
+
+        // make and notify checkpoint with id 1
+        snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+        checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+        // notify savepoint with id 2
+        checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+        // make checkpoint with id 3
+        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
+                snapshot(
+                        3L, checkpointSnapshotStrategy, 
checkpointStreamFactory, closeableRegistry);
+
+        // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd 
checkpoint is
+        // incremental.
+        Map<StateHandleID, StreamStateHandle> sharedState3 =
+                incrementalRemoteKeyedStateHandle3.getSharedState();
+        long placeholderStateHandleCount =
+                sharedState3.entrySet().stream()
+                        .filter(e -> e.getValue() instanceof 
PlaceholderStreamStateHandle)
+                        .count();
+
+        Assert.assertTrue(placeholderStateHandleCount > 0);
+
+        checkpointSnapshotStrategy.close();
+        closeableRegistry.close();

Review comment:
       These closeables could be closed in `finally` statements to ensure no 
resource leak.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to