masteryhx commented on code in PR #23765:
URL: https://github.com/apache/flink/pull/23765#discussion_r1403408187


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##########
@@ -80,7 +86,8 @@ public RocksNativeFullSnapshotStrategy(
             @Nonnull LocalRecoveryConfig localRecoveryConfig,
             @Nonnull File instanceBasePath,
             @Nonnull UUID backendUID,
-            @Nonnull RocksDBStateUploader rocksDBStateUploader) {
+            @Nonnull RocksDBStateUploader rocksDBStateUploader,
+            RocksDBStateFileVerifier stateFileVerifier) {

Review Comment:
   nit: Nullable Annotation



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java:
##########
@@ -86,6 +86,18 @@ public class RocksDBOptions {
                     .withDescription(
                             "The number of threads (per stateful operator) 
used to transfer (download and upload) files in RocksDBStateBackend.");
 
+    /**
+     * Whether to verify the Checksum of the incremental sst file during 
Checkpoint in
+     * RocksDBStateBackend.
+     */
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Boolean> 
CHECKPOINT_VERIFY_CHECKSUM_ENABLE =
+            
ConfigOptions.key("state.backend.rocksdb.checkpoint.verify.checksum.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to verify the Checksum of the incremental 
sst file during Checkpoint in RocksDBStateBackend");

Review Comment:
   Could we also add some important messages here:
   1. It may introduce some overhead for checkpoint procedure if enable.
   2. If checksum fail, we will fail the the checkpoint.



##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.junit.Assert.fail;
+
+/** {@link RocksDBStateFileVerifier} test. */
+public class RocksdbStateFileVerifierTest {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void rocksdbStateFileVerifierTest() throws Exception {
+        ArrayList columnFamilyHandles = new ArrayList<>(1);
+        String rootPath = folder.newFolder().getAbsolutePath();
+        File dbPath = new File(rootPath, "db");
+        File cpPath = new File(rootPath, "cp");
+
+        try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
+                ColumnFamilyOptions colOptions = new ColumnFamilyOptions();
+                Options sstFileReaderOptions = new Options(dbOptions, 
colOptions);
+                WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
+                RocksDB db =
+                        RocksDB.open(
+                                dbOptions,
+                                dbPath.toString(),
+                                Collections.singletonList(
+                                        new ColumnFamilyDescriptor(
+                                                "default".getBytes(), 
colOptions)),
+                                columnFamilyHandles);
+                RocksDBStateFileVerifier rocksDBStateFileVerifier =
+                        new RocksDBStateFileVerifier(sstFileReaderOptions)) {
+
+            byte[] key = "checkpoint".getBytes();
+            byte[] val = "incrementalTest".getBytes();
+            db.put(writeOptions, key, val);
+
+            try (Checkpoint checkpoint = Checkpoint.create(db)) {
+                checkpoint.createCheckpoint(cpPath.toString());
+            }
+
+            List<Path> sstFiles =
+                    Arrays.stream(FileUtils.listDirectory(cpPath.toPath()))
+                            .filter(file -> 
file.getFileName().toString().endsWith(SST_FILE_SUFFIX))
+                            .collect(Collectors.toList());
+
+            Assert.assertEquals(sstFiles.isEmpty(), false);

Review Comment:
   ```suggestion
               Assert.assertFalse(sstFiles.isEmpty());
   ```



##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.junit.Assert.fail;
+
+/** {@link RocksDBStateFileVerifier} test. */
+public class RocksdbStateFileVerifierTest {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void rocksdbStateFileVerifierTest() throws Exception {
+        ArrayList columnFamilyHandles = new ArrayList<>(1);
+        String rootPath = folder.newFolder().getAbsolutePath();
+        File dbPath = new File(rootPath, "db");
+        File cpPath = new File(rootPath, "cp");
+
+        try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
+                ColumnFamilyOptions colOptions = new ColumnFamilyOptions();
+                Options sstFileReaderOptions = new Options(dbOptions, 
colOptions);
+                WriteOptions writeOptions = new 
WriteOptions().setDisableWAL(true);
+                RocksDB db =
+                        RocksDB.open(
+                                dbOptions,
+                                dbPath.toString(),
+                                Collections.singletonList(
+                                        new ColumnFamilyDescriptor(
+                                                "default".getBytes(), 
colOptions)),
+                                columnFamilyHandles);
+                RocksDBStateFileVerifier rocksDBStateFileVerifier =

Review Comment:
   How about also making it AutoClosable ?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##########
@@ -137,6 +145,7 @@ protected PreviousSnapshot snapshotMetaData(
     @Override
     public void close() {
         stateUploader.close();
+        stateFileVerifier.close();

Review Comment:
   It could be null, right ?



##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksdbStateFileVerifierTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.junit.Assert.fail;
+
+/** {@link RocksDBStateFileVerifier} test. */
+public class RocksdbStateFileVerifierTest {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void rocksdbStateFileVerifierTest() throws Exception {
+        ArrayList columnFamilyHandles = new ArrayList<>(1);

Review Comment:
   nit: List



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java:
##########
@@ -86,6 +86,18 @@ public class RocksDBOptions {
                     .withDescription(
                             "The number of threads (per stateful operator) 
used to transfer (download and upload) files in RocksDBStateBackend.");
 
+    /**
+     * Whether to verify the Checksum of the incremental sst file during 
Checkpoint in
+     * RocksDBStateBackend.
+     */
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Boolean> 
CHECKPOINT_VERIFY_CHECKSUM_ENABLE =

Review Comment:
   The doc should be regenerated as you could see the failed case in CI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to