hemantk-12 commented on code in PR #3824:
URL: https://github.com/apache/ozone/pull/3824#discussion_r1011049594
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -608,52 +792,48 @@ public Comparator<CompactionNode> reversed() {
}
}
-
- public void dumpCompactioNodeTable() {
- List<CompactionNode> nodeList =
- compactionNodeTable.values().stream().collect(Collectors.toList());
- Collections.sort(nodeList, new NodeComparator());
- for (CompactionNode n : nodeList ) {
- LOG.warn("File : " + n.fileName + " :: Total keys : "
- + n.totalNumberOfKeys);
- LOG.warn("File : " + n.fileName + " :: Cumulative keys : " +
+ @VisibleForTesting
+ public void dumpCompactionNodeTable() {
+ List<CompactionNode> nodeList = compactionNodeTable.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
+ for (CompactionNode n : nodeList) {
+ LOG.info("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
+ LOG.info("File '{}' cumulative keys: {}", n.fileName,
n.cumulativeKeysReverseTraversal);
}
}
+ @VisibleForTesting
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void printMutableGraphFromAGivenNode(
- String fileName, int level, MutableGraph<CompactionNode> mutableGraph) {
+ public synchronized void printMutableGraphFromAGivenNode(String fileName,
+ int level, MutableGraph<CompactionNode> mutableGraph) {
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
return;
}
- System.out.print("\nCompaction Level : " + level + " Expandin File:" +
- fileName + ":\n");
+ LOG.info("\nCompaction Level: " + level + " Expanding File: " + fileName);
Set<CompactionNode> nextLevel = new HashSet<>();
nextLevel.add(infileNode);
- Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ Set<CompactionNode> currentLevel = new HashSet<>(nextLevel);
int i = 1;
while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ LOG.info("DAG Level: " + i++);
+ StringBuilder sb = new StringBuilder();
for (CompactionNode current : currentLevel) {
Set<CompactionNode> successors = mutableGraph.successors(current);
for (CompactionNode oneSucc : successors) {
- System.out.print(oneSucc.fileName + " ");
+ sb.append(oneSucc.fileName).append(" ");
nextLevel.add(oneSucc);
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ LOG.info(sb.toString());
+ currentLevel = new HashSet<>(nextLevel);
nextLevel = new HashSet<>();
- LOG.warn("");
}
}
- public synchronized void printMutableGraph(
- String srcSnapId, String destSnapId,
+ synchronized void printMutableGraph(String srcSnapId, String destSnapId,
MutableGraph<CompactionNode> mutableGraph) {
LOG.warn("Printing the Graph");
Review Comment:
Is it supposed to be `WARN`?
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +558,231 @@ public HashSet<String> readRocksDBLiveFiles(String
dbPathArg) {
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(Arrays.asList(inputFiles),
+ Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8))
{
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Snapshot information node class in the DAG.
+ */
+ static class Snapshot {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ Snapshot(String db, String id, long gen) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination
snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest)
{
+
+ LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
+ LOG.debug("Doing forward diff between src and dest snapshots: " +
+ src.dbPath + " to " + dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
+ List<String> res = new ArrayList<>();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ res.add(file);
+ }
+ LOG.debug(logSB.toString());
+
+ } else {
+ res.addAll(fwdDAGDifferentFiles);
}
- LOG.warn("");
+
+ return res;
}
+ /**
+ * Core getSSTDiffList logic.
+ */
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
- Snapshot src, Snapshot dest,
- HashSet<String> srcSnapFiles,
- HashSet<String> destSnapFiles,
+ private void internalGetSSTDiffList(Snapshot src, Snapshot dest,
+ HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Src " + src.dbPath + " File " + fileName +
+ " was never compacted");
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding SST file: " + fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ Set<CompactionNode> nextLevel = new HashSet<>();
int i = 1;
while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ LOG.debug("DAG Level: " + i++);
for (CompactionNode current : currentLevel) {
- LOG.warn("acknowledging file " + current.fileName);
+ LOG.debug("Acknowledging file " + current.fileName);
if (current.snapshotGeneration <= dest.snapshotGeneration) {
- LOG.warn("Reached dest generation count. SrcSnapshot : " +
- src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
- " Contain Diffrent file " + current.fileName);
+ LOG.debug("Reached dest generation count. Src: " +
+ src.dbPath + " and Dest: " + dest.dbPath +
+ " have different file: " + current.fileName);
differentFiles.add(current.fileName);
continue;
}
Set<CompactionNode> successors = mutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("No further compaction for the file" +
- ".SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- current.fileName);
+ if (successors.size() == 0) {
Review Comment:
1. I would prefer to use`isEmpty`. Is `successors` nullable? If yes, you can
use `CollectionUtils.isEmpty()`.
2. you can flatten it by using continue;
```
if (successors.isEmpty()) {
...
continue;
}
// else not needed.
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -664,21 +844,21 @@ public synchronized void printMutableGraph(
topLevelNodes.add(n);
}
}
- Iterator iter = topLevelNodes.iterator();
+ Iterator<CompactionNode> iter = topLevelNodes.iterator();
while (iter.hasNext()) {
Review Comment:
It would be much easier to understand if we use Queue instead of
set/iterator and resetting it at multiple places.
```
synchronized void printMutableGraph(String srcSnapId, String destSnapId,
MutableGraph<CompactionNode>
mutableGraph) {
LOG.warn("Printing the Graph");
Queue<CompactionNode> topLevelNodes = new LinkedList<>();
Set<CompactionNode> allNodes = new HashSet<>();
for (CompactionNode n : mutableGraph.nodes()) {
if (srcSnapId == null ||
n.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
topLevelNodes.add(n);
}
}
while (!topLevelNodes .isEmpty()) {
CompactionNode n = topLevelNodes.poll();
Set<CompactionNode> succ = mutableGraph.successors(n);
LOG.debug("Parent Node: " + n.fileName);
if (succ.size() == 0) {
LOG.debug("No child node");
allNodes.add(n);
continue;
}
for (CompactionNode oneSucc : succ) {
LOG.debug("Children Node: " + oneSucc.fileName);
if (srcSnapId == null ||
oneSucc.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
allNodes.add(oneSucc);
continue;
}
topLevelNodes.add(oneSucc);
}
}
LOG.debug("src snap: " + srcSnapId);
LOG.debug("dest snap: " + destSnapId);
for (CompactionNode n : allNodes) {
LOG.debug("Files are: " + n.fileName);
}
}
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +558,231 @@ public HashSet<String> readRocksDBLiveFiles(String
dbPathArg) {
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(Arrays.asList(inputFiles),
+ Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8))
{
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Snapshot information node class in the DAG.
+ */
+ static class Snapshot {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ Snapshot(String db, String id, long gen) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination
snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest)
{
+
+ LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
+ LOG.debug("Doing forward diff between src and dest snapshots: " +
+ src.dbPath + " to " + dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
+ List<String> res = new ArrayList<>();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ res.add(file);
+ }
+ LOG.debug(logSB.toString());
+
+ } else {
+ res.addAll(fwdDAGDifferentFiles);
}
- LOG.warn("");
+
+ return res;
}
+ /**
+ * Core getSSTDiffList logic.
+ */
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
- Snapshot src, Snapshot dest,
- HashSet<String> srcSnapFiles,
- HashSet<String> destSnapFiles,
+ private void internalGetSSTDiffList(Snapshot src, Snapshot dest,
+ HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Src " + src.dbPath + " File " + fileName +
+ " was never compacted");
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding SST file: " + fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ Set<CompactionNode> nextLevel = new HashSet<>();
int i = 1;
while (currentLevel.size() != 0) {
Review Comment:
nit: `isEmpty` is better from readability perspective.
```
while (!currentLevel.isEmpty()) {
...
}
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -688,130 +868,139 @@ public synchronized void printMutableGraph(
iter.remove();
iter = topLevelNodes.iterator();
}
- LOG.warn("src snap:" + srcSnapId);
- LOG.warn("dest snap:" + destSnapId);
+ LOG.debug("src snap: " + srcSnapId);
+ LOG.debug("dest snap: " + destSnapId);
for (CompactionNode n : allNodes) {
Review Comment:
I'll suggested to put for loop inside`LOG.isDebugEnabled()` check .
```
if (LOG.isDebugEnabled()) {
for (CompactionNode n : allNodes) {
LOG.debug("Files are: " + n.fileName);
}
}
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -688,130 +868,139 @@ public synchronized void printMutableGraph(
iter.remove();
iter = topLevelNodes.iterator();
}
- LOG.warn("src snap:" + srcSnapId);
- LOG.warn("dest snap:" + destSnapId);
+ LOG.debug("src snap: " + srcSnapId);
+ LOG.debug("dest snap: " + destSnapId);
for (CompactionNode n : allNodes) {
- LOG.warn("Files are :" + n.fileName);
+ LOG.debug("Files are: " + n.fileName);
}
}
+ public MutableGraph<CompactionNode> getCompactionFwdDAG() {
+ return compactionDAGFwd;
+ }
- public void createSnapshot(RocksDB rocksDB) throws InterruptedException {
-
- LOG.warn("Current time is::" + System.currentTimeMillis());
- long t1 = System.currentTimeMillis();
-
- cpPath = cpPath + lastSnapshotCounter;
- createCheckPoint(rocksDbPath, cpPath, rocksDB);
- allSnapshots[lastSnapshotCounter] = new Snapshot(cpPath,
- lastSnapshotPrefix, lastSnapshotCounter);
-
- long t2 = System.currentTimeMillis();
- LOG.warn("Current time is::" + t2);
-
- LOG.warn("millisecond difference is ::" + (t2 - t1));
- Thread.sleep(100);
- ++lastSnapshotCounter;
- lastSnapshotPrefix = "sid_" + lastSnapshotCounter;
- LOG.warn("done :: 1");
+ public MutableGraph<CompactionNode> getCompactionReverseDAG() {
+ return compactionDAGReverse;
}
+ /**
+ * Populate the compaction DAG with input and output SST files lists.
+ */
+ @SuppressFBWarnings({"AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"})
+ private void populateCompactionDAG(List<String> inputFiles,
+ List<String> outputFiles, long seqNum) {
- public void printAllSnapshots() throws InterruptedException {
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
- }
- LOG.warn("Snapshot id" + snap.snapshotID);
- LOG.warn("Snapshot path" + snap.dbPath);
- LOG.warn("Snapshot Generation" + snap.snapshotGeneration);
- LOG.warn("");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Input {} -> Output {}", inputFiles, outputFiles);
}
- }
- public void diffAllSnapshots() throws InterruptedException, RocksDBException
{
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
+ for (String outfile : outputFiles) {
+ CompactionNode outfileNode = compactionNodeTable.get(outfile);
+ if (outfileNode == null) {
+ long numKeys = 0L;
+ try {
+ numKeys = getSSTFileSummary(outfile);
+ } catch (Exception e) {
+ LOG.warn("Exception in getSSTFileSummary: {}", e.getMessage());
+ }
+ outfileNode = new CompactionNode(outfile, null, numKeys, seqNum);
+ compactionDAGFwd.addNode(outfileNode);
+ compactionDAGReverse.addNode(outfileNode);
+ compactionNodeTable.put(outfile, outfileNode);
}
- printSnapdiffSSTFiles(allSnapshots[lastSnapshotCounter - 1], snap);
- }
- }
- public MutableGraph<CompactionNode> getCompactionFwdDAG() {
- return compactionDAGFwd;
- }
+ for (String infile : inputFiles) {
+ CompactionNode infileNode = compactionNodeTable.get(infile);
+ if (infileNode == null) {
+ long numKeys = 0L;
+ try {
+ numKeys = getSSTFileSummary(infile);
+ } catch (Exception e) {
+ LOG.warn("Exception in getSSTFileSummary: {}", e.getMessage());
+ }
+ infileNode = new CompactionNode(infile, null, numKeys, seqNum);
+ compactionDAGFwd.addNode(infileNode);
+ compactionDAGReverse.addNode(infileNode);
+ compactionNodeTable.put(infile, infileNode);
+ }
+ // Draw the edges
+ if (outfileNode.fileName.compareToIgnoreCase(
+ infileNode.fileName) != 0) {
+ compactionDAGFwd.putEdge(outfileNode, infileNode);
+ compactionDAGReverse.putEdge(infileNode, outfileNode);
+ }
+ }
+ }
- public MutableGraph<CompactionNode> getCompactionReverseDAG() {
- return compactionDAGFwd;
}
+ @VisibleForTesting
public synchronized void traverseGraph(
MutableGraph<CompactionNode> reverseMutableGraph,
MutableGraph<CompactionNode> fwdMutableGraph) {
- List<CompactionNode> nodeList =
- compactionNodeTable.values().stream().collect(Collectors.toList());
- Collections.sort(nodeList, new NodeComparator());
+ List<CompactionNode> nodeList = compactionNodeTable.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
- for (CompactionNode infileNode : nodeList ) {
- // fist go through fwdGraph to find nodes that don't have succesors.
+ for (CompactionNode infileNode : nodeList) {
+ // fist go through fwdGraph to find nodes that don't have successors.
// These nodes will be the top level nodes in reverse graph
Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
- if (successors == null || successors.size() == 0) {
- LOG.warn("traverseGraph : No successors. cumulative " +
- "keys : " + infileNode.cumulativeKeysReverseTraversal + "::total "
+
- "keys ::" + infileNode.totalNumberOfKeys);
+ if (successors.size() == 0) {
+ LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
+ infileNode.cumulativeKeysReverseTraversal,
+ infileNode.totalNumberOfKeys);
infileNode.cumulativeKeysReverseTraversal =
infileNode.totalNumberOfKeys;
}
}
HashSet<CompactionNode> visited = new HashSet<>();
- for (CompactionNode infileNode : nodeList ) {
+ for (CompactionNode infileNode : nodeList) {
if (visited.contains(infileNode)) {
continue;
}
visited.add(infileNode);
- System.out.print("traverseGraph: Visiting node " + infileNode.fileName +
- ":\n");
+ LOG.debug("Visiting node '{}'", infileNode.fileName);
Set<CompactionNode> nextLevel = new HashSet<>();
nextLevel.add(infileNode);
- Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ Set<CompactionNode> currentLevel = new HashSet<>(nextLevel);
nextLevel = new HashSet<>();
int i = 1;
while (currentLevel.size() != 0) {
- LOG.warn("traverseGraph : DAG Level :" + i++);
+ LOG.debug("DAG Level {}", i++);
for (CompactionNode current : currentLevel) {
- LOG.warn("traverseGraph : expanding node " + current.fileName);
+ LOG.debug("Expanding node {}", current.fileName);
Set<CompactionNode> successors =
reverseMutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("traverseGraph : No successors. cumulative " +
- "keys : " + current.cumulativeKeysReverseTraversal);
+ if (successors.size() == 0) {
+ LOG.debug("No successors. Cumulative keys: {}",
+ current.cumulativeKeysReverseTraversal);
} else {
for (CompactionNode oneSucc : successors) {
- LOG.warn("traverseGraph : Adding to the next level : " +
- oneSucc.fileName);
- LOG.warn("traverseGraph : " + oneSucc.fileName + "cum" + " keys"
- + oneSucc.cumulativeKeysReverseTraversal + "parent" + " " +
- current.fileName + " total " + current.totalNumberOfKeys);
+ LOG.debug("Adding to the next level: {}", oneSucc.fileName);
+ LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
+ oneSucc.fileName, oneSucc.cumulativeKeysReverseTraversal,
+ current.fileName, current.totalNumberOfKeys);
oneSucc.cumulativeKeysReverseTraversal +=
current.cumulativeKeysReverseTraversal;
nextLevel.add(oneSucc);
}
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ currentLevel = new HashSet<>(nextLevel);
nextLevel = new HashSet<>();
- LOG.warn("");
}
}
}
+ @VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
}
+
+ @VisibleForTesting
+ public static Logger getLog() {
+ return LOG;
+ }
Review Comment:
Why do we need these functions?
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +611,287 @@ public HashSet<String> readRocksDBLiveFiles(String
dbPathArg) {
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ private long reconstructionSnapshotGeneration;
+
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(Arrays.asList(inputFiles),
+ Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8))
{
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Returns a set of SST nodes that doesn't exist in the in-memory DAG.
+ */
+ private Set<String> getNonExistentSSTSet(Set<String> sstSet) {
+
+ // Make a copy of sstSet
+ HashSet<String> loadSet = new HashSet<>(sstSet);
+
+ // Check if all the nodes in the provided SST set is already loaded in DAG
+ for (String sstFile : sstSet) {
+ if (compactionNodeTable.containsKey(sstFile)) {
+ loadSet.remove(sstFile);
+ }
+ }
+
+ return loadSet;
+ }
+
+ /**
+ * Returns true only when all nodes in sstSet exists in DAG.
+ */
+ private boolean isSSTSetLoaded(HashSet<String> sstSet) {
+
+ return getNonExistentSSTSet(sstSet).size() == 0;
+ }
+
+ /**
+ * Read compaction log until all dest (and src) db checkpoint SST
+ * nodes show up in the graph, or when it reaches the end of the log.
+ */
+ private boolean loadCompactionDAGBySSTSet(HashSet<String> sstSet) {
+
+ // Get a set of SSTs that doesn't exist in the current in-memory DAG
+ Set<String> loadSet = getNonExistentSSTSet(sstSet);
+
+ if (loadSet.size() == 0) {
+ // All expected nodes in the sstSet are already there,
+ // no need to read/load any compaction log from disk.
+ return true;
+ }
+
+ // Otherwise, load compaction logs in order until all nodes are present in
+ // the DAG.
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString()
+ .toLowerCase().endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+
+ // TODO: Potential optimization: stop reading as soon as all nodes
are
+ // there. Currently it loads an entire file at a time.
+ readCompactionLogToDAG(logPath.toString());
+
+ for (Iterator<String> it = loadSet.iterator(); it.hasNext();) {
+ String sstFile = it.next();
+ if (compactionNodeTable.containsKey(sstFile)) {
+ LOG.debug("Found SST node: {}", sstFile);
+ it.remove();
+ }
+ }
+
+ if (loadSet.size() == 0) {
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+
+ // Just in case there are still nodes to be expected not loaded.
+ if (loadSet.size() > 0) {
+ LOG.warn("The following nodes are missing from the compaction log: {}. "
+ + "Possibly because those a newly flushed SSTs that haven't gone "
+ + "though any compaction yet", loadSet);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase().endsWith(".log"))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination
snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest)
{
+
+ LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
+ LOG.debug("Doing forward diff between src and dest snapshots: " +
+ src.dbPath + " to " + dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
+ List<String> res = new ArrayList<>();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ res.add(file);
+ }
+ LOG.debug(logSB.toString());
+
+ } else {
+ res.addAll(fwdDAGDifferentFiles);
}
- LOG.warn("");
+
+ return res;
}
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
+ public synchronized void internalGetSSTDiffList(
Snapshot src, Snapshot dest,
HashSet<String> srcSnapFiles,
HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Src " + src.dbPath + " File " + fileName +
+ " was never compacted");
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding SST file: " + fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ Set<CompactionNode> nextLevel = new HashSet<>();
Review Comment:
If you move `nextLevel` deceleration and initialization inside while loop
(line #850), you won't have to reset it at line #887.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -608,52 +792,48 @@ public Comparator<CompactionNode> reversed() {
}
}
-
- public void dumpCompactioNodeTable() {
- List<CompactionNode> nodeList =
- compactionNodeTable.values().stream().collect(Collectors.toList());
- Collections.sort(nodeList, new NodeComparator());
- for (CompactionNode n : nodeList ) {
- LOG.warn("File : " + n.fileName + " :: Total keys : "
- + n.totalNumberOfKeys);
- LOG.warn("File : " + n.fileName + " :: Cumulative keys : " +
+ @VisibleForTesting
+ public void dumpCompactionNodeTable() {
+ List<CompactionNode> nodeList = compactionNodeTable.values().stream()
+ .sorted(new NodeComparator()).collect(Collectors.toList());
+ for (CompactionNode n : nodeList) {
+ LOG.info("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
+ LOG.info("File '{}' cumulative keys: {}", n.fileName,
n.cumulativeKeysReverseTraversal);
}
}
+ @VisibleForTesting
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void printMutableGraphFromAGivenNode(
- String fileName, int level, MutableGraph<CompactionNode> mutableGraph) {
+ public synchronized void printMutableGraphFromAGivenNode(String fileName,
+ int level, MutableGraph<CompactionNode> mutableGraph) {
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
return;
}
- System.out.print("\nCompaction Level : " + level + " Expandin File:" +
- fileName + ":\n");
+ LOG.info("\nCompaction Level: " + level + " Expanding File: " + fileName);
Set<CompactionNode> nextLevel = new HashSet<>();
nextLevel.add(infileNode);
- Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ Set<CompactionNode> currentLevel = new HashSet<>(nextLevel);
Review Comment:
Same as previously commented.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +558,231 @@ public HashSet<String> readRocksDBLiveFiles(String
dbPathArg) {
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(Arrays.asList(inputFiles),
+ Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8))
{
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Snapshot information node class in the DAG.
+ */
+ static class Snapshot {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ Snapshot(String db, String id, long gen) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination
snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest)
{
+
+ LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
+ LOG.debug("Doing forward diff between src and dest snapshots: " +
+ src.dbPath + " to " + dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
+ List<String> res = new ArrayList<>();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ res.add(file);
+ }
+ LOG.debug(logSB.toString());
+
+ } else {
+ res.addAll(fwdDAGDifferentFiles);
}
- LOG.warn("");
+
+ return res;
}
+ /**
+ * Core getSSTDiffList logic.
+ */
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
- Snapshot src, Snapshot dest,
- HashSet<String> srcSnapFiles,
- HashSet<String> destSnapFiles,
+ private void internalGetSSTDiffList(Snapshot src, Snapshot dest,
+ HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Src " + src.dbPath + " File " + fileName +
+ " was never compacted");
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding SST file: " + fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ Set<CompactionNode> nextLevel = new HashSet<>();
int i = 1;
while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ LOG.debug("DAG Level: " + i++);
for (CompactionNode current : currentLevel) {
- LOG.warn("acknowledging file " + current.fileName);
+ LOG.debug("Acknowledging file " + current.fileName);
if (current.snapshotGeneration <= dest.snapshotGeneration) {
- LOG.warn("Reached dest generation count. SrcSnapshot : " +
- src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
- " Contain Diffrent file " + current.fileName);
+ LOG.debug("Reached dest generation count. Src: " +
+ src.dbPath + " and Dest: " + dest.dbPath +
+ " have different file: " + current.fileName);
differentFiles.add(current.fileName);
continue;
}
Set<CompactionNode> successors = mutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("No further compaction for the file" +
- ".SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- current.fileName);
+ if (successors.size() == 0) {
+ LOG.debug("No further compaction happened for the current file. " +
+ "src: " + src.dbPath + " and dest: " + dest.dbPath +
+ " have different file: " + current.fileName);
differentFiles.add(current.fileName);
} else {
for (CompactionNode oneSucc : successors) {
if (sameFiles.contains(oneSucc.fileName) ||
differentFiles.contains(oneSucc.fileName)) {
- LOG.warn("Skipping file :" + oneSucc.fileName);
+ LOG.debug("Skipping known same file: " + oneSucc.fileName);
continue;
}
if (destSnapFiles.contains(oneSucc.fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " +
- oneSucc.fileName);
+ LOG.debug("src: " + src.dbPath + " and dest: " + dest.dbPath +
+ " have the same file: " + oneSucc.fileName);
sameFiles.add(oneSucc.fileName);
continue;
} else {
Review Comment:
`continue` here doesn't make any difference. May be you want to do something
like:
```
if (sameFiles.contains(oneSucc.fileName) ||
differentFiles.contains(oneSucc.fileName)) {
LOG.debug("Skipping known same file: " + oneSucc.fileName);
continue;
}
if (destSnapFiles.contains(oneSucc.fileName)) {
LOG.debug("src: " + src.dbPath + " and dest: " + dest.dbPath +
" have the same file: " + oneSucc.fileName);
sameFiles.add(oneSucc.fileName);
continue;
}
LOG.debug("src " + src.dbPath + " and dest " + dest.dbPath +
" have a different SST: " + oneSucc.fileName);
nextLevel.add(oneSucc);
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +558,231 @@ public HashSet<String> readRocksDBLiveFiles(String
dbPathArg) {
return liveFiles;
}
- // Given the src and destination Snapshots, it prints a Diff list.
- private synchronized void printSnapdiffSSTFiles(
- Snapshot src, Snapshot dest) throws RocksDBException {
- LOG.warn("Src Snapshot files :" + src.dbPath);
+ /**
+ * Process each line of compaction log text file input and populate the DAG.
+ */
+ private synchronized void processCompactionLogLine(String line) {
+
+ LOG.debug("Processing line: {}", line);
+
+ if (line.startsWith("#")) {
+ // Skip comments
+ LOG.debug("Comment line, skipped");
+ } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+ // Read sequence number
+ LOG.debug("Reading sequence number as snapshot generation");
+ final String seqNumStr =
+ line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+ // This would the snapshot generation for the nodes to come
+ reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+ } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+ // Read compaction log entry
+
+ // Trim the beginning
+ line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+ final String[] io = line.split(":");
+ if (io.length != 2) {
+ LOG.error("Invalid line in compaction log: {}", line);
+ return;
+ }
+ final String[] inputFiles = io[0].split(",");
+ final String[] outputFiles = io[1].split(",");
+ populateCompactionDAG(Arrays.asList(inputFiles),
+ Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+ } else {
+ LOG.error("Invalid line in compaction log: {}", line);
+ }
+ }
+
+ /**
+ * Helper to read compaction log to the internal DAG.
+ */
+ private void readCompactionLogToDAG(String currCompactionLogPath) {
+ LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+ try (Stream<String> logLineStream =
+ Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8))
{
+ logLineStream.forEach(this::processCompactionLogLine);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Load existing compaction log files to the in-memory DAG.
+ * This only needs to be done once during OM startup.
+ */
+ public synchronized void loadAllCompactionLogs() {
+ if (compactionLogDir == null) {
+ throw new RuntimeException("Compaction log directory must be set first");
+ }
+ reconstructionSnapshotGeneration = 0L;
+ try {
+ try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+ .sorted()) {
+ for (Path logPath : pathStream.collect(Collectors.toList())) {
+ readCompactionLogToDAG(logPath.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error listing compaction log dir " +
+ compactionLogDir, e);
+ }
+ }
+
+ /**
+ * Snapshot information node class in the DAG.
+ */
+ static class Snapshot {
+ private final String dbPath;
+ private final String snapshotID;
+ private final long snapshotGeneration;
+
+ Snapshot(String db, String id, long gen) {
+ dbPath = db;
+ snapshotID = id;
+ snapshotGeneration = gen;
+ }
+
+ public String getDbPath() {
+ return dbPath;
+ }
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public long getSnapshotGeneration() {
+ return snapshotGeneration;
+ }
+ }
+
+ /**
+ * Get a list of SST files that differs between src and destination
snapshots.
+ * <p>
+ * Expected input: src is a snapshot taken AFTER the dest.
+ *
+ * @param src source snapshot
+ * @param dest destination snapshot
+ */
+ public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest)
{
+
+ LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
- LOG.warn("dest Snapshot files :" + dest.dbPath);
HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
HashSet<String> fwdDAGSameFiles = new HashSet<>();
HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
- LOG.warn("Doing forward diff between source and destination " +
- "Snapshots:" + src.dbPath + ", " + dest.dbPath);
- realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
- compactionDAGFwd,
- fwdDAGSameFiles,
- fwdDAGDifferentFiles);
+ LOG.debug("Doing forward diff between src and dest snapshots: " +
+ src.dbPath + " to " + dest.dbPath);
+ internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+ compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
- LOG.warn("Overall Summary \n" +
- "Doing Overall diff between source and destination Snapshots:" +
- src.dbPath + ", " + dest.dbPath);
- System.out.print("fwd DAG Same files :");
- for (String file : fwdDAGSameFiles) {
- System.out.print(file + ", ");
- }
- LOG.warn("");
- System.out.print("\nFwd DAG Different files :");
- for (String file : fwdDAGDifferentFiles) {
- CompactionNode n = compactionNodeTable.get(file);
- System.out.print(file + ", ");
+ List<String> res = new ArrayList<>();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+ dest.dbPath + "':");
+ StringBuilder logSB = new StringBuilder();
+
+ logSB.append("Fwd DAG same SST files: ");
+ for (String file : fwdDAGSameFiles) {
+ logSB.append(file).append(" ");
+ }
+ LOG.debug(logSB.toString());
+
+ logSB.setLength(0);
+ logSB.append("Fwd DAG different SST files: ");
+ for (String file : fwdDAGDifferentFiles) {
+ logSB.append(file).append(" ");
+ res.add(file);
+ }
+ LOG.debug(logSB.toString());
+
+ } else {
+ res.addAll(fwdDAGDifferentFiles);
}
- LOG.warn("");
+
+ return res;
}
+ /**
+ * Core getSSTDiffList logic.
+ */
@SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
- public synchronized void realPrintSnapdiffSSTFiles(
- Snapshot src, Snapshot dest,
- HashSet<String> srcSnapFiles,
- HashSet<String> destSnapFiles,
+ private void internalGetSSTDiffList(Snapshot src, Snapshot dest,
+ HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
MutableGraph<CompactionNode> mutableGraph,
HashSet<String> sameFiles, HashSet<String> differentFiles) {
-
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+ LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+ src.dbPath, dest.dbPath, fileName);
sameFiles.add(fileName);
continue;
}
CompactionNode infileNode =
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
if (infileNode == null) {
- LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
- "never compacted");
+ LOG.debug("Src " + src.dbPath + " File " + fileName +
+ " was never compacted");
differentFiles.add(fileName);
continue;
}
- System.out.print(" Expandin File:" + fileName + ":\n");
- Set<CompactionNode> nextLevel = new HashSet<>();
- nextLevel.add(infileNode);
+ LOG.debug("Expanding SST file: " + fileName);
Set<CompactionNode> currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
- nextLevel = new HashSet<>();
+ currentLevel.add(infileNode);
+ Set<CompactionNode> nextLevel = new HashSet<>();
int i = 1;
while (currentLevel.size() != 0) {
- LOG.warn("DAG Level :" + i++);
+ LOG.debug("DAG Level: " + i++);
for (CompactionNode current : currentLevel) {
- LOG.warn("acknowledging file " + current.fileName);
+ LOG.debug("Acknowledging file " + current.fileName);
if (current.snapshotGeneration <= dest.snapshotGeneration) {
- LOG.warn("Reached dest generation count. SrcSnapshot : " +
- src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
- " Contain Diffrent file " + current.fileName);
+ LOG.debug("Reached dest generation count. Src: " +
+ src.dbPath + " and Dest: " + dest.dbPath +
+ " have different file: " + current.fileName);
differentFiles.add(current.fileName);
continue;
}
Set<CompactionNode> successors = mutableGraph.successors(current);
- if (successors == null || successors.size() == 0) {
- LOG.warn("No further compaction for the file" +
- ".SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- current.fileName);
+ if (successors.size() == 0) {
+ LOG.debug("No further compaction happened for the current file. " +
+ "src: " + src.dbPath + " and dest: " + dest.dbPath +
+ " have different file: " + current.fileName);
differentFiles.add(current.fileName);
} else {
for (CompactionNode oneSucc : successors) {
if (sameFiles.contains(oneSucc.fileName) ||
differentFiles.contains(oneSucc.fileName)) {
- LOG.warn("Skipping file :" + oneSucc.fileName);
+ LOG.debug("Skipping known same file: " + oneSucc.fileName);
continue;
}
if (destSnapFiles.contains(oneSucc.fileName)) {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Same file " +
- oneSucc.fileName);
+ LOG.debug("src: " + src.dbPath + " and dest: " + dest.dbPath +
+ " have the same file: " + oneSucc.fileName);
sameFiles.add(oneSucc.fileName);
continue;
} else {
- LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
- "Snapshot" + dest.dbPath + " Contain Diffrent file " +
- oneSucc.fileName);
+ LOG.debug("src " + src.dbPath + " and dest " + dest.dbPath +
+ " have a different SST: " + oneSucc.fileName);
nextLevel.add(oneSucc);
}
}
}
}
- currentLevel = new HashSet<>();
- currentLevel.addAll(nextLevel);
+ currentLevel = new HashSet<>(nextLevel);
Review Comment:
I don't think you need to initialize new HashSet here since it is getting
reset altogether in next time. I feel it is unnecessary.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -688,130 +868,139 @@ public synchronized void printMutableGraph(
iter.remove();
iter = topLevelNodes.iterator();
}
- LOG.warn("src snap:" + srcSnapId);
- LOG.warn("dest snap:" + destSnapId);
+ LOG.debug("src snap: " + srcSnapId);
+ LOG.debug("dest snap: " + destSnapId);
for (CompactionNode n : allNodes) {
- LOG.warn("Files are :" + n.fileName);
+ LOG.debug("Files are: " + n.fileName);
}
}
+ public MutableGraph<CompactionNode> getCompactionFwdDAG() {
+ return compactionDAGFwd;
+ }
- public void createSnapshot(RocksDB rocksDB) throws InterruptedException {
-
- LOG.warn("Current time is::" + System.currentTimeMillis());
- long t1 = System.currentTimeMillis();
-
- cpPath = cpPath + lastSnapshotCounter;
- createCheckPoint(rocksDbPath, cpPath, rocksDB);
- allSnapshots[lastSnapshotCounter] = new Snapshot(cpPath,
- lastSnapshotPrefix, lastSnapshotCounter);
-
- long t2 = System.currentTimeMillis();
- LOG.warn("Current time is::" + t2);
-
- LOG.warn("millisecond difference is ::" + (t2 - t1));
- Thread.sleep(100);
- ++lastSnapshotCounter;
- lastSnapshotPrefix = "sid_" + lastSnapshotCounter;
- LOG.warn("done :: 1");
+ public MutableGraph<CompactionNode> getCompactionReverseDAG() {
+ return compactionDAGReverse;
}
+ /**
+ * Populate the compaction DAG with input and output SST files lists.
+ */
+ @SuppressFBWarnings({"AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"})
+ private void populateCompactionDAG(List<String> inputFiles,
+ List<String> outputFiles, long seqNum) {
- public void printAllSnapshots() throws InterruptedException {
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
- }
- LOG.warn("Snapshot id" + snap.snapshotID);
- LOG.warn("Snapshot path" + snap.dbPath);
- LOG.warn("Snapshot Generation" + snap.snapshotGeneration);
- LOG.warn("");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Input {} -> Output {}", inputFiles, outputFiles);
}
- }
- public void diffAllSnapshots() throws InterruptedException, RocksDBException
{
- for (Snapshot snap : allSnapshots) {
- if (snap == null) {
- break;
+ for (String outfile : outputFiles) {
+ CompactionNode outfileNode = compactionNodeTable.get(outfile);
+ if (outfileNode == null) {
+ long numKeys = 0L;
+ try {
+ numKeys = getSSTFileSummary(outfile);
+ } catch (Exception e) {
+ LOG.warn("Exception in getSSTFileSummary: {}", e.getMessage());
+ }
+ outfileNode = new CompactionNode(outfile, null, numKeys, seqNum);
+ compactionDAGFwd.addNode(outfileNode);
+ compactionDAGReverse.addNode(outfileNode);
+ compactionNodeTable.put(outfile, outfileNode);
}
- printSnapdiffSSTFiles(allSnapshots[lastSnapshotCounter - 1], snap);
- }
- }
- public MutableGraph<CompactionNode> getCompactionFwdDAG() {
- return compactionDAGFwd;
- }
+ for (String infile : inputFiles) {
+ CompactionNode infileNode = compactionNodeTable.get(infile);
Review Comment:
Code, from line #898-#910 and #913-#925, is same. You can extract out the
common code to helper function.
```
public CompactionNode getCompactionNode(String file) {
CompactionNode fileNode = compactionNodeTable.get(file);
if (fileNode != null) {
long numKeys = 0L;
try {
numKeys = getSSTFileSummary(outfile);
} catch (Exception e) {
LOG.warn("Exception in getSSTFileSummary: {}", e.getMessage());
}
fileNode = new CompactionNode(file, null, numKeys, seqNum);
compactionDAGFwd.addNode(fileNode);
compactionDAGReverse.addNode(fileNode);
compactionNodeTable.put(file, fileNode);
}
return fileNode;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]