hemantk-12 commented on code in PR #3824:
URL: https://github.com/apache/ozone/pull/3824#discussion_r1012198808


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -215,253 +305,265 @@ public static void addDebugLevel(Integer level) {
     DEBUG_LEVEL.add(level);
   }
 
-  // Flushes the WAL and Creates a RocksDB checkpoint
-  @SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
-  public void createCheckPoint(String dbPathArg, String cpPathArg,
-                               RocksDB rocksDB) {
-    LOG.warn("Creating Checkpoint for RocksDB instance : " +
-        dbPathArg + "in a CheckPoint Location" + cpPathArg);
-    try {
-      rocksDB.flush(new FlushOptions());
-      Checkpoint cp = Checkpoint.create(rocksDB);
-      cp.createCheckpoint(cpPathArg);
-    } catch (RocksDBException e) {
-      throw new RuntimeException(e.getMessage());
+  /**
+   * Append (then flush) to the current compaction log file path.
+   * Note: This does NOT automatically append newline to the log.
+   */
+  private synchronized void appendToCurrentCompactionLog(String content) {
+    if (currentCompactionLogPath == null) {
+      LOG.error("Unable to append compaction log. "
+          + "Compaction log path is not set. "
+          + "Please check initialization.");
+      throw new RuntimeException("Compaction log path not set");
+    }
+    try (BufferedWriter bw = Files.newBufferedWriter(
+        Paths.get(currentCompactionLogPath),
+        StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
+      bw.write(content);
+      bw.flush();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to append compaction log to " +
+          currentCompactionLogPath, e);
     }
   }
 
-  public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
-      throws RocksDBException {
-    setRocksDBForCompactionTracking(rocksOptions,
-        new ArrayList<AbstractEventListener>());
-  }
-
-  public void setRocksDBForCompactionTracking(
-      DBOptions rocksOptions, List<AbstractEventListener> list) {
-    final AbstractEventListener onCompactionCompletedListener =
-        new AbstractEventListener() {
-          @Override
-          @SuppressFBWarnings({
-              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
-              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-          public void onCompactionCompleted(
-              final RocksDB db, final CompactionJobInfo compactionJobInfo) {
-            synchronized (db) {
-              LOG.warn(compactionJobInfo.compactionReason().toString());
-              LOG.warn("List of input files:");
-              for (String file : compactionJobInfo.inputFiles()) {
-                LOG.warn(file);
-                String saveLinkFileName =
-                    saveCompactedFilePath + new File(file).getName();
-                Path link = Paths.get(saveLinkFileName);
-                Path srcFile = Paths.get(file);
-                try {
-                  Files.createLink(link, srcFile);
-                } catch (IOException e) {
-                  LOG.warn("Exception in creating hardlink");
-                  e.printStackTrace();
-                }
-              }
-              LOG.warn("List of output files:");
-              for (String file : compactionJobInfo.outputFiles()) {
-                LOG.warn(file + ",");
-              }
-              // Let us also build the graph
-              for (String outFilePath : compactionJobInfo.outputFiles()) {
-                String outfile =
-                    Paths.get(outFilePath).getFileName().toString();
-                CompactionNode outfileNode = compactionNodeTable.get(outfile);
-                if (outfileNode == null) {
-                  long numKeys = 0;
-                  try {
-                    numKeys = getSSTFileSummary(outfile);
-                  } catch (Exception e) {
-                    LOG.warn(e.getMessage());
-                  }
-                  outfileNode = new CompactionNode(outfile,
-                      lastSnapshotPrefix, numKeys,
-                      currentCompactionGen);
-                  compactionDAGFwd.addNode(outfileNode);
-                  compactionDAGReverse.addNode(outfileNode);
-                  compactionNodeTable.put(outfile, outfileNode);
-                }
-                for (String inFilePath : compactionJobInfo.inputFiles()) {
-                  String infile =
-                      Paths.get(inFilePath).getFileName().toString();
-                  CompactionNode infileNode = compactionNodeTable.get(infile);
-                  if (infileNode == null) {
-                    long numKeys = 0;
-                    try {
-                      numKeys = getSSTFileSummary(infile);
-                    } catch (Exception e) {
-                      LOG.warn(e.getMessage());
-                    }
-                    infileNode = new CompactionNode(infile,
-                        lastSnapshotPrefix,
-                        numKeys, UNKNOWN_COMPACTION_GEN);
-                    compactionDAGFwd.addNode(infileNode);
-                    compactionDAGReverse.addNode(infileNode);
-                    compactionNodeTable.put(infile, infileNode);
-                  }
-                  if (outfileNode.fileName.compareToIgnoreCase(
-                      infileNode.fileName) != 0) {
-                    compactionDAGFwd.putEdge(outfileNode, infileNode);
-                    compactionDAGReverse.putEdge(infileNode, outfileNode);
-                  }
-                }
-              }
-              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
-                printMutableGraph(null, null, compactionDAGFwd);
-              }
-            }
-          }
-        };
+  /**
+   * Append a sequence number to the compaction log (roughly) when an Ozone
+   * snapshot (RDB checkpoint) is taken.
+   * @param sequenceNum RDB sequence number
+   */
+  public void appendSequenceNumberToCompactionLog(long sequenceNum) {
+    final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum + "\n";
+    appendToCurrentCompactionLog(line);
+  }
 
-    list.add(onCompactionCompletedListener);
+  /**
+   * Takes {@link org.rocksdb.Options}.
+   */
+  public void setRocksDBForCompactionTracking(Options rocksOptions,
+      List<AbstractEventListener> list) {
+    list.add(newCompactionBeginListener());
+    list.add(newCompactionCompletedListener());
     rocksOptions.setListeners(list);
   }
 
+  public void setRocksDBForCompactionTracking(Options rocksOptions) {
+    setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+  }
 
+  /**
+   * Takes {@link org.rocksdb.DBOptions}.
+   */
+  public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
+      List<AbstractEventListener> list) {
+    list.add(newCompactionBeginListener());
+    list.add(newCompactionCompletedListener());
+    rocksOptions.setListeners(list);
+  }
 
-  public void setRocksDBForCompactionTracking(Options rocksOptions)
+  public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
       throws RocksDBException {
-    setRocksDBForCompactionTracking(rocksOptions,
-        new ArrayList<AbstractEventListener>());
-  }
-
-  public void setRocksDBForCompactionTracking(
-      Options rocksOptions, List<AbstractEventListener> list) {
-    final AbstractEventListener onCompactionCompletedListener =
-        new AbstractEventListener() {
-          @Override
-          @SuppressFBWarnings({
-              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
-              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-          public void onCompactionCompleted(
-              final RocksDB db,final CompactionJobInfo compactionJobInfo) {
-            synchronized (db) {
-              LOG.warn(compactionJobInfo.compactionReason().toString());
-              LOG.warn("List of input files:");
-              for (String file : compactionJobInfo.inputFiles()) {
-                LOG.warn(file);
-                String saveLinkFileName =
-                    saveCompactedFilePath + new File(file).getName();
-                Path link = Paths.get(saveLinkFileName);
-                Path srcFile = Paths.get(file);
-                try {
-                  Files.createLink(link, srcFile);
-                } catch (IOException e) {
-                  LOG.warn("Exception in creating hardlink");
-                  e.printStackTrace();
-                }
-              }
-              LOG.warn("List of output files:");
-              for (String file : compactionJobInfo.outputFiles()) {
-                LOG.warn(file);
-              }
-              // Let us also build the graph
-              for (String outFilePath : compactionJobInfo.outputFiles()) {
-                String outfile =
-                    Paths.get(outFilePath).getFileName().toString();
-                CompactionNode outfileNode = compactionNodeTable.get(outfile);
-                if (outfileNode == null) {
-                  long numKeys = 0;
-                  try {
-                    numKeys = getSSTFileSummary(outfile);
-                  } catch (Exception e) {
-                    LOG.warn(e.getMessage());
-                  }
-                  outfileNode = new CompactionNode(outfile,
-                      lastSnapshotPrefix,
-                      numKeys, currentCompactionGen);
-                  compactionDAGFwd.addNode(outfileNode);
-                  compactionDAGReverse.addNode(outfileNode);
-                  compactionNodeTable.put(outfile, outfileNode);
-                }
-                for (String inFilePath : compactionJobInfo.inputFiles()) {
-                  String infile =
-                      Paths.get(inFilePath).getFileName().toString();
-                  CompactionNode infileNode = compactionNodeTable.get(infile);
-                  if (infileNode == null) {
-                    long numKeys = 0;
-                    try {
-                      numKeys = getSSTFileSummary(infile);
-                    } catch (Exception e) {
-                      LOG.warn(e.getMessage());
-                    }
-                    infileNode = new CompactionNode(infile,
-                        lastSnapshotPrefix, numKeys,
-                        UNKNOWN_COMPACTION_GEN);
-                    compactionDAGFwd.addNode(infileNode);
-                    compactionDAGReverse.addNode(infileNode);
-                    compactionNodeTable.put(infile, infileNode);
-                  }
-                  if (outfileNode.fileName.compareToIgnoreCase(
-                      infileNode.fileName) != 0) {
-                    compactionDAGFwd.putEdge(outfileNode, infileNode);
-                    compactionDAGReverse.putEdge(infileNode, outfileNode);
-                  }
-                }
-              }
-              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
-                printMutableGraph(null, null,
-                    compactionDAGFwd);
-              }
+    setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+  }
+
+  private AbstractEventListener newCompactionBeginListener() {
+    return new AbstractEventListener() {
+      @Override
+      public void onCompactionBegin(RocksDB db,
+          CompactionJobInfo compactionJobInfo) {
+
+        synchronized (compactionListenerWriteLock) {
+
+          if (compactionJobInfo.inputFiles().size() == 0) {
+            LOG.error("Compaction input files list is empty");
+            return;
+          }
+
+          // Create hardlink backups for the SST files that are going
+          // to be deleted after this RDB compaction.
+          for (String file : compactionJobInfo.inputFiles()) {
+            LOG.debug("Creating hard link for '{}'", file);
+            String saveLinkFileName =
+                sstBackupDir + new File(file).getName();
+            Path link = Paths.get(saveLinkFileName);
+            Path srcFile = Paths.get(file);
+            try {
+              Files.createLink(link, srcFile);
+            } catch (IOException e) {
+              LOG.error("Exception in creating hard link for {}", file);
+              throw new RuntimeException("Failed to create hard link", e);
             }
           }
-        };
 
-    list.add(onCompactionCompletedListener);
-    rocksOptions.setListeners(list);
+        }
+      }
+    };
   }
 
-  public RocksDB getRocksDBInstanceWithCompactionTracking(String dbPath)
-      throws RocksDBException {
-    final Options opt = new Options().setCreateIfMissing(true)
-        .setCompressionType(CompressionType.NO_COMPRESSION);
-    opt.setMaxBytesForLevelMultiplier(2);
-    setRocksDBForCompactionTracking(opt);
-    return RocksDB.open(opt, dbPath);
+  /**
+   * Helper function to append the list of SST files to a StringBuilder
+   * for a compaction log entry. Does not append a new line.
+   */
+  private static void appendCompactionLogStringBuilder(List<String> files,
+      StringBuilder sb) {
+
+    Iterator<String> it = files.iterator();
+    while (it.hasNext()) {

Review Comment:
   nit: You can use `String.join(",", files);` or 
`files.stream().collect(Collectors.joining(","));`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to