Apache9 commented on code in PR #4338:
URL: https://github.com/apache/hbase/pull/4338#discussion_r848001281


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -93,13 +94,11 @@
   protected static final String MINOR_COMPACTION_DROP_CACHE =
       "hbase.regionserver.minorcompaction.pagecache.drop";
 
-  private final boolean dropCacheMajor;
-  private final boolean dropCacheMinor;
+  protected final boolean dropCacheMajor;
+  protected final boolean dropCacheMinor;
 
-  // In compaction process only a single thread will access and write to this 
field, and
-  // getCompactionTargets is the only place we will access it other than the 
compaction thread, so
-  // make it volatile.
-  protected volatile T writer = null;
+  protected final Set<T> writers = new HashSet<>();

Review Comment:
   writers = Collections.synchronizedSet(Collections.newSetFromMap(new 
IdentityHashMap<>()));
   
   I think IdentityHashMap is safer as we do not know whether the 
implementation class for CellSink will implement equals or not. And also use 
synchronizedSet so we do not need to manually synchronize everywhere, the only 
place we need to manually synchronize is when we want to iterator over the it.
   
   And the same to the CompactionProgress field.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java:
##########
@@ -1157,11 +1154,6 @@ protected List<HStoreFile> 
doCompaction(CompactionRequestImpl cr,
     }
     replaceStoreFiles(filesToCompact, sfs, true);
 
-    // This step is necessary for the correctness of 
BrokenStoreFileCleanerChore. It lets the
-    // CleanerChore know that compaction is done and the file can be cleaned 
up if compaction
-    // have failed.
-    storeEngine.resetCompactionWriter();

Review Comment:
   This is not needed any more?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -381,34 +377,46 @@ protected final List<Path> compact(final 
CompactionRequestImpl request,
       } else {
         Closeables.close(scanner, true);
       }
-      if (!finished && writer != null) {
-        abortWriter();
+      if (progress != null) {
+        synchronized (this.progress) {
+          this.progress.remove(progress);
+        }
+      }
+      if (writer != null) {

Review Comment:
   We do not need to return here?
   
   Strange...



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -550,22 +558,35 @@ protected InternalScanner createScanner(HStore store, 
ScanInfo scanInfo,
         dropDeletesFromRow, dropDeletesToRow);
   }
 
-  public List<Path> getCompactionTargets() {
-    T writer = this.writer;
-    if (writer == null) {
-      return Collections.emptyList();
-    }
-    if (writer instanceof StoreFileWriter) {
-      return Arrays.asList(((StoreFileWriter) writer).getPath());
+  public CompactionProgress getProgress() {
+    long currentCompactedKvs = 0;
+    long totalCompactingKVs = 0;
+    long totalCompactedSize = 0;
+    synchronized (progress) {
+      for (CompactionProgress p: progress) {
+        currentCompactedKvs += p.getCurrentCompactedKvs();
+        totalCompactingKVs += p.getTotalCompactingKVs();
+        totalCompactedSize += p.getTotalCompactedSize();
+      }
     }
-    return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> 
sfw.getPath())
-      .collect(Collectors.toList());
+    return new CompactionProgress(totalCompactingKVs)
+      .setCurrentCompactedKvs(currentCompactedKvs)
+      .setTotalCompactedSize(totalCompactedSize);
   }
 
-  /**
-   * Reset the Writer when the new storefiles were successfully added
-   */
-  public void resetWriter(){
-    writer = null;
+  public List<Path> getCompactionTargets() {
+    // Build a list of all the compaction targets for all active writers
+    List<Path> targets = new ArrayList<>();
+    synchronized (writers) {
+      for (T writer: writers) {
+        if (writer instanceof StoreFileWriter) {
+          targets.add(((StoreFileWriter) writer).getPath());
+        }
+        ((AbstractMultiFileWriter) writer).writers().stream()

Review Comment:
   else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to