hemantk-12 commented on code in PR #3824:
URL: https://github.com/apache/ozone/pull/3824#discussion_r1011029835


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -471,134 +611,287 @@ public HashSet<String> readRocksDBLiveFiles(String 
dbPathArg) {
     return liveFiles;
   }
 
-  // Given the src and destination Snapshots, it prints a Diff list.
-  private synchronized void printSnapdiffSSTFiles(
-      Snapshot src, Snapshot dest) throws RocksDBException {
-    LOG.warn("Src Snapshot files :" + src.dbPath);
+  private long reconstructionSnapshotGeneration;
+
+  /**
+   * Process each line of compaction log text file input and populate the DAG.
+   */
+  private synchronized void processCompactionLogLine(String line) {
+
+    LOG.debug("Processing line: {}", line);
+
+    if (line.startsWith("#")) {
+      // Skip comments
+      LOG.debug("Comment line, skipped");
+    } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+      // Read sequence number
+      LOG.debug("Reading sequence number as snapshot generation");
+      final String seqNumStr =
+          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+      // This would the snapshot generation for the nodes to come
+      reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+    } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+      // Read compaction log entry
+
+      // Trim the beginning
+      line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+      final String[] io = line.split(":");
+      if (io.length != 2) {
+        LOG.error("Invalid line in compaction log: {}", line);
+        return;
+      }
+      final String[] inputFiles = io[0].split(",");
+      final String[] outputFiles = io[1].split(",");
+      populateCompactionDAG(Arrays.asList(inputFiles),
+          Arrays.asList(outputFiles), reconstructionSnapshotGeneration);
+    } else {
+      LOG.error("Invalid line in compaction log: {}", line);
+    }
+  }
+
+  /**
+   * Helper to read compaction log to the internal DAG.
+   */
+  private void readCompactionLogToDAG(String currCompactionLogPath) {
+    LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+    try (Stream<String> logLineStream =
+        Files.lines(Paths.get(currCompactionLogPath), StandardCharsets.UTF_8)) 
{
+      logLineStream.forEach(this::processCompactionLogLine);
+    } catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+  }
+
+  /**
+   * Returns a set of SST nodes that doesn't exist in the in-memory DAG.
+   */
+  private Set<String> getNonExistentSSTSet(Set<String> sstSet) {
+
+    // Make a copy of sstSet
+    HashSet<String> loadSet = new HashSet<>(sstSet);
+
+    // Check if all the nodes in the provided SST set is already loaded in DAG
+    for (String sstFile : sstSet) {
+      if (compactionNodeTable.containsKey(sstFile)) {
+        loadSet.remove(sstFile);
+      }
+    }
+
+    return loadSet;
+  }
+
+  /**
+   * Returns true only when all nodes in sstSet exists in DAG.
+   */
+  private boolean isSSTSetLoaded(HashSet<String> sstSet) {
+
+    return getNonExistentSSTSet(sstSet).size() == 0;
+  }
+
+  /**
+   * Read compaction log until all dest (and src) db checkpoint SST
+   * nodes show up in the graph, or when it reaches the end of the log.
+   */
+  private boolean loadCompactionDAGBySSTSet(HashSet<String> sstSet) {
+
+    // Get a set of SSTs that doesn't exist in the current in-memory DAG
+    Set<String> loadSet = getNonExistentSSTSet(sstSet);
+
+    if (loadSet.size() == 0) {
+      // All expected nodes in the sstSet are already there,
+      //  no need to read/load any compaction log from disk.
+      return true;
+    }
+
+    // Otherwise, load compaction logs in order until all nodes are present in
+    //  the DAG.
+    try {
+      try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+          .filter(e -> e.toString()
+              .toLowerCase().endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+          .sorted()) {
+        for (Path logPath : pathStream.collect(Collectors.toList())) {
+
+          // TODO: Potential optimization: stop reading as soon as all nodes 
are
+          //  there. Currently it loads an entire file at a time.
+          readCompactionLogToDAG(logPath.toString());
+
+          for (Iterator<String> it = loadSet.iterator(); it.hasNext();) {
+            String sstFile = it.next();
+            if (compactionNodeTable.containsKey(sstFile)) {
+              LOG.debug("Found SST node: {}", sstFile);
+              it.remove();
+            }
+          }
+
+          if (loadSet.size() == 0) {
+            break;
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error listing compaction log dir " +
+          compactionLogDir, e);
+    }
+
+    // Just in case there are still nodes to be expected not loaded.
+    if (loadSet.size() > 0) {
+      LOG.warn("The following nodes are missing from the compaction log: {}. "
+          + "Possibly because those a newly flushed SSTs that haven't gone "
+          + "though any compaction yet", loadSet);
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Load existing compaction log files to the in-memory DAG.
+   * This only needs to be done once during OM startup.
+   */
+  public synchronized void loadAllCompactionLogs() {
+    if (compactionLogDir == null) {
+      throw new RuntimeException("Compaction log directory must be set first");
+    }
+    reconstructionSnapshotGeneration = 0L;
+    try {
+      try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+          .filter(e -> e.toString().toLowerCase().endsWith(".log"))
+          .sorted()) {
+        for (Path logPath : pathStream.collect(Collectors.toList())) {
+          readCompactionLogToDAG(logPath.toString());
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error listing compaction log dir " +
+          compactionLogDir, e);
+    }
+  }
+
+  /**
+   * Get a list of SST files that differs between src and destination 
snapshots.
+   * <p>
+   * Expected input: src is a snapshot taken AFTER the dest.
+   *
+   * @param src source snapshot
+   * @param dest destination snapshot
+   */
+  public synchronized List<String> getSSTDiffList(Snapshot src, Snapshot dest) 
{
+
+    LOG.debug("src '{}' -> dest '{}'", src.dbPath, dest.dbPath);
     HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
-    LOG.warn("dest Snapshot files :" + dest.dbPath);
     HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
 
     HashSet<String> fwdDAGSameFiles = new HashSet<>();
     HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
 
-    LOG.warn("Doing forward diff between source and destination " +
-        "Snapshots:" + src.dbPath + ", " + dest.dbPath);
-    realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
-        compactionDAGFwd,
-        fwdDAGSameFiles,
-        fwdDAGDifferentFiles);
+    LOG.debug("Doing forward diff between src and dest snapshots: " +
+        src.dbPath + " to " + dest.dbPath);
+    internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+        compactionDAGFwd, fwdDAGSameFiles, fwdDAGDifferentFiles);
 
-    LOG.warn("Overall Summary \n" +
-            "Doing Overall diff between source and destination Snapshots:" +
-        src.dbPath + ", " + dest.dbPath);
-    System.out.print("fwd DAG Same files :");
-    for (String file : fwdDAGSameFiles) {
-      System.out.print(file + ", ");
-    }
-    LOG.warn("");
-    System.out.print("\nFwd DAG Different files :");
-    for (String file : fwdDAGDifferentFiles) {
-      CompactionNode n = compactionNodeTable.get(file);
-      System.out.print(file + ", ");
+    List<String> res = new ArrayList<>();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+          dest.dbPath + "':");
+      StringBuilder logSB = new StringBuilder();
+
+      logSB.append("Fwd DAG same SST files:      ");
+      for (String file : fwdDAGSameFiles) {
+        logSB.append(file).append(" ");
+      }
+      LOG.debug(logSB.toString());
+
+      logSB.setLength(0);
+      logSB.append("Fwd DAG different SST files: ");
+      for (String file : fwdDAGDifferentFiles) {
+        logSB.append(file).append(" ");
+        res.add(file);
+      }
+      LOG.debug(logSB.toString());
+
+    } else {
+      res.addAll(fwdDAGDifferentFiles);
     }
-    LOG.warn("");
+
+    return res;
   }
 
   @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-  public synchronized void realPrintSnapdiffSSTFiles(
+  public synchronized void internalGetSSTDiffList(
       Snapshot src, Snapshot dest,
       HashSet<String> srcSnapFiles,
       HashSet<String> destSnapFiles,
       MutableGraph<CompactionNode> mutableGraph,
       HashSet<String> sameFiles, HashSet<String> differentFiles) {
 
-
     for (String fileName : srcSnapFiles) {
       if (destSnapFiles.contains(fileName)) {
-        LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
-            "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+        LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+            src.dbPath, dest.dbPath, fileName);
         sameFiles.add(fileName);
         continue;
       }
       CompactionNode infileNode =
           
compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
       if (infileNode == null) {
-        LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
-            "never compacted");
+        LOG.debug("Src " + src.dbPath + " File " + fileName +
+            " was never compacted");
         differentFiles.add(fileName);
         continue;
       }
-      System.out.print(" Expandin File:" + fileName + ":\n");
-      Set<CompactionNode> nextLevel = new HashSet<>();
-      nextLevel.add(infileNode);
+      LOG.debug("Expanding SST file: " + fileName);
       Set<CompactionNode> currentLevel = new HashSet<>();
-      currentLevel.addAll(nextLevel);
-      nextLevel = new HashSet<>();
+      currentLevel.add(infileNode);
+      Set<CompactionNode> nextLevel = new HashSet<>();

Review Comment:
   If you move `nextLevel` deceleration and initialization  inside while loop 
(line # 850), you won't have to reset it at line # 887.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to