hemantk-12 commented on code in PR #3981: URL: https://github.com/apache/ozone/pull/3981#discussion_r1037491451
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.rocksdiff; + +/** + * Node in the compaction DAG that represents an SST file. + */ +public class CompactionNode { + // Name of the SST file + private final String fileName; + // The last snapshot created before this node came into existence + private final String snapshotId; Review Comment: I'm skeptical about this. I don't understand its usage and it might not provide correct information. Let's say node A gets added because of snapshot-1 and it is used by snapshot-1, snapshot-2, ... snapshot-10. Now DAG pruning kicks in and deletes Snapshot-1 to Snapshot-5 but node A can't be deleted because it is used by Snapshot-6 to Snapshot-10 (That's my understanding). What would be the SnapshotID of the node in this case? Snapshot-1's ID or Snapshot-6's ID? I don't think Snapshot-1's ID is correct because Snapshot-1 doesn't even exist in DAG. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.rocksdiff; + +/** + * Node in the compaction DAG that represents an SST file. + */ +public class CompactionNode { + // Name of the SST file + private final String fileName; + // The last snapshot created before this node came into existence + private final String snapshotId; + private final long snapshotGeneration; + private final long totalNumberOfKeys; + private long cumulativeKeysReverseTraversal; + + CompactionNode(String file, String ssId, long numKeys, long seqNum) { + fileName = file; + // snapshotId field added here only for improved debuggability Review Comment: This comment doesn't make any sense here. If it is really needed, it should be move to the declaration of the object at line # 27. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.rocksdiff; + +/** + * Node in the compaction DAG that represents an SST file. + */ +public class CompactionNode { + // Name of the SST file + private final String fileName; + // The last snapshot created before this node came into existence + private final String snapshotId; + private final long snapshotGeneration; + private final long totalNumberOfKeys; + private long cumulativeKeysReverseTraversal; + + CompactionNode(String file, String ssId, long numKeys, long seqNum) { + fileName = file; + // snapshotId field added here only for improved debuggability + snapshotId = ssId; + totalNumberOfKeys = numKeys; + snapshotGeneration = seqNum; + cumulativeKeysReverseTraversal = 0L; + } + + @Override + public String toString() { + return String.format("Node{%s}", fileName); + } + + public String getFileName() { + return fileName; + } + + public String getSnapshotId() { + return snapshotId; + } + + public long getSnapshotGeneration() { + return snapshotGeneration; + } + + public long getTotalNumberOfKeys() { + return totalNumberOfKeys; + } + + public long getCumulativeKeysReverseTraversal() { + return cumulativeKeysReverseTraversal; + } + + public void setCumulativeKeysReverseTraversal( + long cumulativeKeysReverseTraversal) { + this.cumulativeKeysReverseTraversal = cumulativeKeysReverseTraversal; + } + + public void addCumulativeKeysReverseTraversal(long diff) { Review Comment: Curious if it needs to be thread safe? ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.rocksdiff; + +/** + * Node in the compaction DAG that represents an SST file. + */ +public class CompactionNode { + // Name of the SST file + private final String fileName; + // The last snapshot created before this node came into existence + private final String snapshotId; + private final long snapshotGeneration; + private final long totalNumberOfKeys; + private long cumulativeKeysReverseTraversal; + + CompactionNode(String file, String ssId, long numKeys, long seqNum) { Review Comment: Why is it package private? ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -510,4 +509,139 @@ void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file, private String toStr(byte[] bytes) { return new String(bytes, UTF_8); } + + /** + * Helper that traverses the graphs for testing. + * @param compactionNodeMap + * @param reverseMutableGraph + * @param fwdMutableGraph + */ + void traverseGraph( Review Comment: ```suggestion private void traverseGraph( ConcurrentMap<String, CompactionNode> compactionNodeMap, MutableGraph<CompactionNode> reverseMutableGraph, MutableGraph<CompactionNode> fwdMutableGraph ) { ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ozone.rocksdiff; + +/** + * Snapshot information node class for the differ. + */ +public class DifferSnapshotInfo { + private final String dbPath; + private final String snapshotID; + private final long snapshotGeneration; + + public DifferSnapshotInfo(String db, String id, long gen) { + dbPath = db; + snapshotID = id; + snapshotGeneration = gen; + } + + public String getDbPath() { + return dbPath; + } + + public String getSnapshotID() { + return snapshotID; + } + + public long getSnapshotGeneration() { + return snapshotGeneration; + } + + @Override + public String toString() { + return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\'' Review Comment: May be use `String.format()` or `StirngBuilder`. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -90,8 +101,145 @@ public static void init() { GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO); } + /** + * Test cases for testGetSSTDiffListWithoutDB. + */ + private static Stream<Arguments> testGetSSTDiffListWithoutDBCases() { + + DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo( + "/path/to/dbcp1", "ssUUID1", 3008L); + DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo( + "/path/to/dbcp2", "ssUUID2", 14980L); + DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo( + "/path/to/dbcp3", "ssUUID3", 17975L); + DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo( + "/path/to/dbcp4", "ssUUID4", 18000L); + + Set<String> snapshotSstFiles1 = new HashSet<>(asList( + "000059", "000053")); + Set<String> snapshotSstFiles2 = new HashSet<>(asList( + "000088", "000059", "000053", "000095")); + Set<String> snapshotSstFiles3 = new HashSet<>(asList( + "000088", "000105", "000059", "000053", "000095")); + Set<String> snapshotSstFiles4 = new HashSet<>(asList( + "000088", "000105", "000059", "000053", "000095", "000108")); + Set<String> snapshotSstFiles1Alt1 = new HashSet<>(asList( + "000059", "000053", "000066")); + Set<String> snapshotSstFiles1Alt2 = new HashSet<>(asList( + "000059", "000053", "000052")); + Set<String> snapshotSstFiles2Alt2 = new HashSet<>(asList( + "000088", "000059", "000053", "000095", "000099")); + Set<String> snapshotSstFiles2Alt3 = new HashSet<>(asList( + "000088", "000059", "000053", "000062")); + + return Stream.of( + Arguments.of("Test 1: Regular case. Expands expandable " + + "SSTs in the initial diff.", + snapshotInfo3, + snapshotInfo1, + snapshotSstFiles3, + snapshotSstFiles1, + new HashSet<>(asList("000059", "000053")), + new HashSet<>(asList( + "000066", "000105", "000080", "000087", "000073", "000095"))), + Arguments.of("Test 2: Crafted input: One source " + + "('to' snapshot) SST file is never compacted (newly flushed)", + snapshotInfo4, + snapshotInfo3, + snapshotSstFiles4, + snapshotSstFiles3, + new HashSet<>(asList( + "000088", "000105", "000059", "000053", "000095")), + new HashSet<>(asList("000108"))), + Arguments.of("Test 3: Crafted input: Same SST files " + + "found during SST expansion", + snapshotInfo2, + snapshotInfo1, + snapshotSstFiles2, + snapshotSstFiles1Alt1, + new HashSet<>(asList("000066", "000059", "000053")), + new HashSet<>(asList( + "000080", "000087", "000073", "000095"))), + Arguments.of("Test 4: Crafted input: Skipping known " + + "processed SST.", + snapshotInfo2, + snapshotInfo1, + snapshotSstFiles2Alt2, + snapshotSstFiles1Alt2, + new HashSet<>(), + new HashSet<>()), + Arguments.of("Test 5: Hit snapshot generation early exit " + + "condition", + snapshotInfo2, + snapshotInfo1, + snapshotSstFiles2Alt3, + snapshotSstFiles1, + new HashSet<>(asList("000059", "000053")), + new HashSet<>(asList( + "000066", "000080", "000087", "000073", "000062"))) + ); + } + + /** + * Tests core SST diff list logic. Does not involve DB. + * Focuses on testing edge cases in internalGetSSTDiffList(). + */ + @ParameterizedTest(name = "{0}") + @MethodSource("testGetSSTDiffListWithoutDBCases") + public void testGetSSTDiffListWithoutDB(String description, + DifferSnapshotInfo srcSnapshot, + DifferSnapshotInfo destSnapshot, + Set<String> srcSnapshotSstFiles, + Set<String> destSnapshotSstFiles, + Set<String> expectedSameSstFiles, + Set<String> expectedDiffSstFiles) { + + RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(); + + String compactionLog = "" + + "S 1000 df6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 0 + + "C 000001,000002:000062\n" + // Additional "compaction" to trigger and test early exit condition + + "S 3008 ef6410c7-151b-4e90-870e-5ef12875acd5\n" // Snapshot 1 + + "C 000068,000062:000069\n" // Regular compaction + + "C 000071,000064,000060,000052:000071,000064,000060,000052\n" + // Trivial move + + "C 000073,000066:000074\n" + + "C 000082,000076,000069:000083\n" + + "C 000087,000080,000074:000088\n" + + "C 000093,000090,000083:\n" // Deletion? + + "S 14980 e7ad72f8-52df-4430-93f6-0ee91d4a47fd\n" // Snapshot 2 + + "C 000098,000096,000085,000078,000071,000064,000060,000052:000099\n" + + "C 000105,000095,000088:000107\n" + + "S 17975 4f084f6e-ed3d-4780-8362-f832303309ea\n"; // Snapshot 3 + + // Construct DAG from compaction log input + Arrays.stream(compactionLog.split("\n")).forEach( + differ::processCompactionLogLine); + + Set<String> actualSameSstFiles = new HashSet<>(); + Set<String> actualDiffSstFiles = new HashSet<>(); + differ.internalGetSSTDiffList( + srcSnapshot, + destSnapshot, + srcSnapshotSstFiles, + destSnapshotSstFiles, + differ.getForwardCompactionDAG(), + actualSameSstFiles, + actualDiffSstFiles); + // Check same and different SST files result + Assertions.assertEquals(expectedSameSstFiles, actualSameSstFiles); + Assertions.assertEquals(expectedDiffSstFiles, actualDiffSstFiles); + } + + /** + * Tests DB listener (compaction log generation, SST backup), + * SST file list diff. + * <p> + * Does actual DB write, flush, compaction. + */ @Test - void testMain() throws Exception { + void testWithDB() throws Exception { Review Comment: nit: `testGetSSTDiffListWithDB()`. `testWithDB()` doesn't say what we are actually testing. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -510,4 +509,139 @@ void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file, private String toStr(byte[] bytes) { return new String(bytes, UTF_8); } + + /** + * Helper that traverses the graphs for testing. + * @param compactionNodeMap + * @param reverseMutableGraph + * @param fwdMutableGraph + */ + void traverseGraph( + ConcurrentHashMap<String, CompactionNode> compactionNodeMap, + MutableGraph<CompactionNode> reverseMutableGraph, + MutableGraph<CompactionNode> fwdMutableGraph) { + + List<CompactionNode> nodeList = compactionNodeMap.values().stream() + .sorted(new NodeComparator()).collect(Collectors.toList()); + + for (CompactionNode infileNode : nodeList) { + // fist go through fwdGraph to find nodes that don't have successors. + // These nodes will be the top level nodes in reverse graph + Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode); + if (successors.size() == 0) { + LOG.debug("No successors. Cumulative keys: {}, total keys: {}", + infileNode.getCumulativeKeysReverseTraversal(), + infileNode.getTotalNumberOfKeys()); + infileNode.setCumulativeKeysReverseTraversal( + infileNode.getTotalNumberOfKeys()); + } + } + + HashSet<CompactionNode> visited = new HashSet<>(); Review Comment: ```suggestion Set<CompactionNode> visited = new HashSet<>(); ``` ########## hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java: ########## @@ -194,27 +199,51 @@ void testZeroSizeKey() DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager, volumeName, bucketName, "snap1"); - DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager, - volumeName, bucketName, "snap3"); + DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager, + volumeName, bucketName, "snap2"); // RocksDB does checkpointing in a separate thread, wait for it final File checkpointSnap1 = new File(snap1.getDbPath()); GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000); + final File checkpointSnap2 = new File(snap2.getDbPath()); + GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000); + + List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1); + LOG.debug("Got diff list: {}", sstDiffList21); + + // Delete 1000 keys, take a 3rd snapshot, and do another diff + for (int i = 0; i < 1000; i++) { + bucket.deleteKey("b_" + i); + } + + resp = store.createSnapshot(volumeName, bucketName, "snap3"); + LOG.debug("Snapshot created: {}", resp); + + DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager, + volumeName, bucketName, "snap3"); final File checkpointSnap3 = new File(snap3.getDbPath()); GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000); - List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1); - LOG.debug("Got diff list: {}", actualDiffList); - // Hard-coded expected output. - // The result is deterministic. Retrieved from a successful run. - final List<String> expectedDiffList = Collections.singletonList("000059"); - Assertions.assertEquals(expectedDiffList, actualDiffList); - - // TODO: Use smaller DB write buffer size (currently it is set to 128 MB - // in DBProfile), or generate enough keys (in the millions) to trigger - // RDB compaction. Take another snapshot and do the diff again. - // Then restart OM, do the same diff again to see if DAG reconstruction - // works. + List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2); Review Comment: 1. I believe these test cases can be changed to parameterized tests. 2. Why test is package private not public? What is `testZeroSizeKey` test actually testing? `testZeroSizeKey` name doesn't give me any insight of what it is testing. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -510,4 +509,139 @@ void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file, private String toStr(byte[] bytes) { return new String(bytes, UTF_8); } + + /** + * Helper that traverses the graphs for testing. + * @param compactionNodeMap + * @param reverseMutableGraph + * @param fwdMutableGraph + */ + void traverseGraph( + ConcurrentHashMap<String, CompactionNode> compactionNodeMap, + MutableGraph<CompactionNode> reverseMutableGraph, + MutableGraph<CompactionNode> fwdMutableGraph) { + + List<CompactionNode> nodeList = compactionNodeMap.values().stream() + .sorted(new NodeComparator()).collect(Collectors.toList()); + + for (CompactionNode infileNode : nodeList) { + // fist go through fwdGraph to find nodes that don't have successors. + // These nodes will be the top level nodes in reverse graph + Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode); + if (successors.size() == 0) { + LOG.debug("No successors. Cumulative keys: {}, total keys: {}", + infileNode.getCumulativeKeysReverseTraversal(), + infileNode.getTotalNumberOfKeys()); + infileNode.setCumulativeKeysReverseTraversal( + infileNode.getTotalNumberOfKeys()); + } + } + + HashSet<CompactionNode> visited = new HashSet<>(); + for (CompactionNode infileNode : nodeList) { + if (visited.contains(infileNode)) { + continue; + } + visited.add(infileNode); + LOG.debug("Visiting node '{}'", infileNode.getFileName()); + Set<CompactionNode> currentLevel = new HashSet<>(); + currentLevel.add(infileNode); + int level = 1; + while (!currentLevel.isEmpty()) { + LOG.debug("BFS Level: {}. Current level has {} nodes", + level++, currentLevel.size()); + final Set<CompactionNode> nextLevel = new HashSet<>(); + for (CompactionNode current : currentLevel) { + LOG.debug("Expanding node: {}", current.getFileName()); + Set<CompactionNode> successors = + reverseMutableGraph.successors(current); + if (successors.isEmpty()) { + LOG.debug("No successors. Cumulative keys: {}", + current.getCumulativeKeysReverseTraversal()); + continue; + } + for (CompactionNode node : successors) { + LOG.debug("Adding to the next level: {}", node.getFileName()); + LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}", + node.getFileName(), node.getCumulativeKeysReverseTraversal(), + current.getFileName(), current.getTotalNumberOfKeys()); + node.addCumulativeKeysReverseTraversal( + current.getCumulativeKeysReverseTraversal()); + nextLevel.add(node); + } + } + currentLevel = nextLevel; + } + } + } + + void printMutableGraphFromAGivenNode( + ConcurrentHashMap<String, CompactionNode> compactionNodeMap, + String fileName, + int sstLevel, + MutableGraph<CompactionNode> mutableGraph) { + + CompactionNode infileNode = compactionNodeMap.get(fileName); + if (infileNode == null) { + return; + } + LOG.debug("Expanding file: {}. SST compaction level: {}", + fileName, sstLevel); + Set<CompactionNode> currentLevel = new HashSet<>(); + currentLevel.add(infileNode); + int levelCounter = 1; + while (!currentLevel.isEmpty()) { + LOG.debug("DAG Level: {}", levelCounter++); + final Set<CompactionNode> nextLevel = new HashSet<>(); + StringBuilder sb = new StringBuilder(); + for (CompactionNode current : currentLevel) { + Set<CompactionNode> successors = mutableGraph.successors(current); + for (CompactionNode succNode : successors) { + sb.append(succNode.getFileName()).append(" "); + nextLevel.add(succNode); + } + } + LOG.debug("{}", sb); + currentLevel = nextLevel; + } + } + + void printMutableGraph(String srcSnapId, String destSnapId, Review Comment: ```suggestion private void printMutableGraph(String srcSnapId, String destSnapId, MutableGraph<CompactionNode> mutableGraph) { ``` ########## hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java: ########## @@ -194,27 +199,51 @@ void testZeroSizeKey() DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager, volumeName, bucketName, "snap1"); - DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager, - volumeName, bucketName, "snap3"); + DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager, + volumeName, bucketName, "snap2"); // RocksDB does checkpointing in a separate thread, wait for it final File checkpointSnap1 = new File(snap1.getDbPath()); GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000); + final File checkpointSnap2 = new File(snap2.getDbPath()); + GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000); + + List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1); + LOG.debug("Got diff list: {}", sstDiffList21); + + // Delete 1000 keys, take a 3rd snapshot, and do another diff + for (int i = 0; i < 1000; i++) { + bucket.deleteKey("b_" + i); + } + + resp = store.createSnapshot(volumeName, bucketName, "snap3"); + LOG.debug("Snapshot created: {}", resp); + + DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager, + volumeName, bucketName, "snap3"); final File checkpointSnap3 = new File(snap3.getDbPath()); GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000); - List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1); - LOG.debug("Got diff list: {}", actualDiffList); - // Hard-coded expected output. - // The result is deterministic. Retrieved from a successful run. - final List<String> expectedDiffList = Collections.singletonList("000059"); - Assertions.assertEquals(expectedDiffList, actualDiffList); - - // TODO: Use smaller DB write buffer size (currently it is set to 128 MB - // in DBProfile), or generate enough keys (in the millions) to trigger - // RDB compaction. Take another snapshot and do the diff again. - // Then restart OM, do the same diff again to see if DAG reconstruction - // works. + List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2); + + // snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1 + List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1); + + // Same snapshot. Result should be empty list + List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2); + Assertions.assertTrue(sstDiffList22.isEmpty()); + + // Restart OM, do the same diffs again. See if DAG reconstruction works + cluster.restartOzoneManager(); Review Comment: This is important line for the tests form line 238-246 and nobody can tell that you are testing DAG reconstruction and diff after OM restart form the test name. I think OM restart scenario deserves its own test. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -510,4 +509,139 @@ void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file, private String toStr(byte[] bytes) { return new String(bytes, UTF_8); } + + /** + * Helper that traverses the graphs for testing. + * @param compactionNodeMap + * @param reverseMutableGraph + * @param fwdMutableGraph + */ + void traverseGraph( + ConcurrentHashMap<String, CompactionNode> compactionNodeMap, + MutableGraph<CompactionNode> reverseMutableGraph, + MutableGraph<CompactionNode> fwdMutableGraph) { + + List<CompactionNode> nodeList = compactionNodeMap.values().stream() + .sorted(new NodeComparator()).collect(Collectors.toList()); + + for (CompactionNode infileNode : nodeList) { + // fist go through fwdGraph to find nodes that don't have successors. + // These nodes will be the top level nodes in reverse graph + Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode); + if (successors.size() == 0) { + LOG.debug("No successors. Cumulative keys: {}, total keys: {}", + infileNode.getCumulativeKeysReverseTraversal(), + infileNode.getTotalNumberOfKeys()); + infileNode.setCumulativeKeysReverseTraversal( + infileNode.getTotalNumberOfKeys()); + } + } + + HashSet<CompactionNode> visited = new HashSet<>(); + for (CompactionNode infileNode : nodeList) { + if (visited.contains(infileNode)) { + continue; + } + visited.add(infileNode); + LOG.debug("Visiting node '{}'", infileNode.getFileName()); + Set<CompactionNode> currentLevel = new HashSet<>(); + currentLevel.add(infileNode); + int level = 1; + while (!currentLevel.isEmpty()) { + LOG.debug("BFS Level: {}. Current level has {} nodes", + level++, currentLevel.size()); + final Set<CompactionNode> nextLevel = new HashSet<>(); + for (CompactionNode current : currentLevel) { + LOG.debug("Expanding node: {}", current.getFileName()); + Set<CompactionNode> successors = + reverseMutableGraph.successors(current); + if (successors.isEmpty()) { + LOG.debug("No successors. Cumulative keys: {}", + current.getCumulativeKeysReverseTraversal()); + continue; + } + for (CompactionNode node : successors) { + LOG.debug("Adding to the next level: {}", node.getFileName()); + LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}", + node.getFileName(), node.getCumulativeKeysReverseTraversal(), + current.getFileName(), current.getTotalNumberOfKeys()); + node.addCumulativeKeysReverseTraversal( + current.getCumulativeKeysReverseTraversal()); + nextLevel.add(node); + } + } + currentLevel = nextLevel; + } + } + } + + void printMutableGraphFromAGivenNode( Review Comment: ```suggestion private void printMutableGraphFromAGivenNode( ConcurrentMap<String, CompactionNode> compactionNodeMap, String fileName, int sstLevel, MutableGraph<CompactionNode> mutableGraph ) { ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -90,8 +101,145 @@ public static void init() { GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO); } + /** + * Test cases for testGetSSTDiffListWithoutDB. + */ + private static Stream<Arguments> testGetSSTDiffListWithoutDBCases() { Review Comment: I won't use `test...` for test scenarios because it is supposed to be used for actual test. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java: ########## @@ -298,6 +308,18 @@ private ManagedColumnFamilyOptions getDefaultCfOptions() { .orElseGet(defaultCfProfile::getColumnFamilyOptions); } + /** + * Get default column family options, but with column family write buffer + * size limit overridden. + * @param writeBufferSize Specify column family write buffer size. + * @return ManagedColumnFamilyOptions + */ + private ManagedColumnFamilyOptions getDefaultCfOptions(long writeBufferSize) { Review Comment: 1. I think `getDefaultCfOption()` and `getDefaultCfOptions(long writeBufferSize)` are unnecessary one liner functions. Also `CfOptions` returned by `getDefaultCfOptions(long writeBufferSize)` is not default anymore. 2. This function is getting invoked multiple unnecessarily at line # 287 and then inside loop # 297. CfOptions can be store in a variable and used at both the places. I'll prefer: ``` private Set<TableConfig> makeTableConfigs() { Set<TableConfig> tableConfigs = new HashSet<>(); ManagedColumnFamilyOptions columnFamilyOptions = Optional.ofNullable(defaultCfOptions) .orElseGet(defaultCfProfile::getColumnFamilyOptions); columnFamilyOptions.setWriteBufferSize(rocksDbCfWriteBufferSize); // If default column family was not added, add it with the default options. cfOptions.putIfAbsent(DEFAULT_COLUMN_FAMILY_NAME, columnFamilyOptions); for (Map.Entry<String, ManagedColumnFamilyOptions> entry: cfOptions.entrySet()) { String name = entry.getKey(); ManagedColumnFamilyOptions options = entry.getValue(); if (options == null) { LOG.debug("using default column family options for table: {}", name); tableConfigs.add(new TableConfig(name, columnFamilyOptions)); } else { tableConfigs.add(new TableConfig(name, options)); } } return tableConfigs; } ``` If you really want a function, it could be one function only: ``` private Set<TableConfig> makeTableConfigs() { Set<TableConfig> tableConfigs = new HashSet<>(); ManagedColumnFamilyOptions columnFamilyOptions = getCfOptions(rocksDbCfWriteBufferSize); // If default column family was not added, add it with the default options. cfOptions.putIfAbsent(DEFAULT_COLUMN_FAMILY_NAME, columnFamilyOptions); for (Map.Entry<String, ManagedColumnFamilyOptions> entry: cfOptions.entrySet()) { String name = entry.getKey(); ManagedColumnFamilyOptions options = entry.getValue(); if (options == null) { LOG.debug("using default column family options for table: {}", name); tableConfigs.add(new TableConfig(name, columnFamilyOptions)); } else { tableConfigs.add(new TableConfig(name, options)); } } return tableConfigs; } /** * Get default column family options, but with column family write buffer * size limit overridden. */ private ManagedColumnFamilyOptions getCfOptions(long writeBufferSize) { ManagedColumnFamilyOptions cfOptions = Optional.ofNullable(defaultCfOptions) .orElseGet(defaultCfProfile::getColumnFamilyOptions); cfOptions.setWriteBufferSize(writeBufferSize); return cfOptions; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
