nyl3532016 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r663617140



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
 package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import 
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link 
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are 
core logic of
+ * compaction.
+ */
 @InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
   private static Logger LOG = 
LoggerFactory.getLogger(CompactionThreadManager.class);
+  // Configuration key for the large compaction threads.
+  private final static String LARGE_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.large";
+  private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+  // Configuration key for the small compaction threads.
+  private final static String SMALL_COMPACTION_THREADS =
+      "hbase.compaction.server.thread.compaction.small";
+  private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
 
   private final Configuration conf;
-  private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
-      new ConcurrentHashMap<>();
   private final HCompactionServer server;
+  private HFileSystem fs;
+  private Path rootDir;
+  private FSTableDescriptors tableDescriptors;
+  private CompactThreadControl compactThreadControl;
+  private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+      new ConcurrentHashMap<>();
+  private static CompactionFilesCache compactionFilesCache = new 
CompactionFilesCache();
 
-  public CompactionThreadManager(final Configuration conf, HCompactionServer 
server) {
+  CompactionThreadManager(final Configuration conf, HCompactionServer server) {
     this.conf = conf;
     this.server = server;
+    try {
+      this.fs = new HFileSystem(this.conf, true);
+      this.rootDir = CommonFSUtils.getRootDir(this.conf);
+      this.tableDescriptors = new FSTableDescriptors(conf);
+      int largeThreads =
+          Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
+      int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
+      compactThreadControl = new CompactThreadControl(this, largeThreads, 
smallThreads,
+          COMPACTION_TASK_COMPARATOR, REJECTION);
+    } catch (Throwable t) {
+      LOG.error("Failed construction CompactionThreadManager", t);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return server.getChoreService();
+  }
+
+  @Override
+  public double getCompactionPressure() {
+    double max = 0;
+    for (CompactionTask task : getRunningCompactionTasks().values()) {
+      double normCount = task.getStore().getCompactionPressure();
+      if (normCount > max) {
+        max = normCount;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
+
+  public void requestCompaction(CompactionTask compactionTask) {
+    try {
+      selectFileAndExecuteTask(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Failed requestCompaction {}", compactionTask, e);
+    }
+  }
+
+  private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
+    ServerName rsServerName = compactionTask.getRsServerName();
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    String logStr = compactionTask.toString();
+    MonitoredTask status =
+        TaskMonitor.get().createStatus("Compacting region: " + 
regionInfo.getRegionNameAsString()
+            + ", family: " + cfd.getNameAsString() + " from RS: " + 
rsServerName);
+    status.enableStatusJournal(false);
+    // 1. select compaction and check compaction context is present
+    LOG.info("Start select compaction {}", compactionTask);
+    status.setStatus("Start select compaction");
+    HStore store;
+    CompactionContext compactionContext;
+    Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+    // the synchronized ensure file in store, selected files in cache, 
compacted files in cache,
+    // the three has consistent state, we need this condition to guarantee 
correct selection
+    synchronized 
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+      synchronized 
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+        Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+          compactionTask.isRequestMajor(), compactionTask.getPriority(), 
status, logStr);
+        store = pair.getFirst();
+        Optional<CompactionContext> compaction = pair.getSecond();
+        if (!compaction.isPresent()) {
+          store.close();
+          LOG.info("Compaction context is empty: {}", compactionTask);
+          status.abort("Compaction context is empty and return");
+          return;
+        }
+        compactionContext = compaction.get();
+        // 2. update compactionFilesCache
+        updateSelectedFilesCacheResult =
+            updateStorageAfterSelectCompaction(regionInfo, cfd, 
compactionContext, status, logStr);
+      } // end of synchronized selected files
+    } // end of synchronized compacted files
+    if (!updateSelectedFilesCacheResult.getFirst()) {
+      store.close();
+      return;
+    }
+    List<String> selectedFileNames = 
updateSelectedFilesCacheResult.getSecond();
+    compactionTask.setHStore(store);
+    compactionTask.setCompactionContext(compactionContext);
+    compactionTask.setSelectedFileNames(selectedFileNames);
+    compactionTask.setMonitoredTask(status);
+    compactionTask.setPriority(compactionContext.getRequest().getPriority());
+    // 3. execute a compaction task
+    ThreadPoolExecutor pool;
+    pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+        ? compactThreadControl.getLongCompactions()
+        : compactThreadControl.getShortCompactions();
+    pool.submit(new CompactionTaskRunner(compactionTask));
+  }
+
+  /**
+   * Open store, and select compaction context
+   * @return Store and CompactionContext
+   */
+  Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo 
regionInfo,
+      ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask 
status, String logStr)
+      throws IOException {
+    status.setStatus("Open store");
+    tableDescriptors.get(regionInfo.getTable());
+    HStore store = getStore(conf, fs, rootDir, 
tableDescriptors.get(regionInfo.getTable()),
+      regionInfo, cfd.getNameAsString());
+
+    // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
+    // opportunity to clean compacted file at that time, we clean compacted 
files here
+    compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+      store.getStorefiles().stream().map(sf -> 
sf.getPath().getName()).collect(Collectors.toSet()));
+    if (major) {
+      status.setStatus("Trigger major compaction");
+      store.triggerMajorCompaction();
+    }
+    // get current compacting and compacted files, NOTE: these files are file 
names only, don't
+    // include paths.
+    status.setStatus("Get current compacting and compacted files from 
compactionFilesCache");
+    Set<String> compactingFiles = 
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+    Set<String> compactedFiles = 
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+    Set<String> excludeFiles = new HashSet<>(compactingFiles);
+    excludeFiles.addAll(compactedFiles);
+    // Convert files names to store files
+    status.setStatus("Convert current compacting and compacted files to store 
files");
+    List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store, 
excludeFiles);
+    LOG.info(
+      "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting: 
{}, compacted: {}",
+      logStr, excludeFiles.size(), excludeStoreFiles.size(), 
compactingFiles.size(),
+      compactedFiles.size());
+    status.setStatus("Select store files to compaction, major: " + major);
+    Optional<CompactionContext> compaction = store.selectCompaction(priority,
+      CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+    LOG.info("After select store: {}, if compaction context is present: {}", 
logStr,
+      compaction.isPresent());
+    return new Pair<>(store, compaction);
+  }
+
+  /**
+   * Mark files in compaction context as selected in compactionFilesCache
+   * @return True if success, otherwise if files are already in selected 
compactionFilesCache
+   */
+  private Pair<Boolean, List<String>> 
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+      ColumnFamilyDescriptor cfd, CompactionContext compactionContext, 
MonitoredTask status,
+      String logStr) {
+    LOG.info("Start update compactionFilesCache after select compaction: {}", 
logStr);
+    // save selected files to compactionFilesCache
+    List<String> selectedFilesNames = new ArrayList<>();
+    for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+      selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+    }
+    if (compactionFilesCache.addSelectedFiles(regionInfo, cfd, 
selectedFilesNames)) {
+      LOG.info("Update compactionFilesCache after select compaction success: 
{}", logStr);
+      status.setStatus("Update compactionFilesCache after select compaction 
success");
+      return new Pair<>(Boolean.TRUE, selectedFilesNames);
+    } else {
+      //should not happen
+      LOG.info("selected files are already in store and return: {}", logStr);
+      status.abort("Selected files are already in compactionFilesCache and 
return");
+      return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+    }
+  }
+
+  /**
+   * Execute compaction in the process of compaction server
+   */
+  private void doCompaction(CompactionTask compactionTask) throws IOException {
+    RegionInfo regionInfo = compactionTask.getRegionInfo();
+    ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+    HStore store = compactionTask.getStore();
+    CompactionContext compactionContext = 
compactionTask.getCompactionContext();
+    List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+    MonitoredTask status = compactionTask.getStatus();
+    try {
+      LOG.info("Start compact store: {}, cf: {}, compaction context: {}", 
store, cfd,
+        compactionContext);
+      List<Path> newFiles =
+          
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
 null);
+      LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd, 
newFiles);
+      List<String> newFileNames = new ArrayList<>();
+      for (Path newFile : newFiles) {
+        newFileNames.add(newFile.getName());
+      }
+      reportCompactionCompleted(compactionTask, newFileNames, status);
+    } finally {
+      status.setStatus("Remove selected files");
+      LOG.info("Remove selected files: {}", compactionTask);
+      compactionFilesCache.removeSelectedFiles(regionInfo, cfd, 
selectedFileNames);
+    }
   }
 
-  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws 
IOException {
-    AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New RS admin connection to {}", sn);
-      admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
-      this.rsAdmins.put(sn, admin);
+  /**
+   * Report compaction completed to RS
+   * @return True if report to RS success, otherwise false
+   */
+  private boolean reportCompactionCompleted(CompactionTask task, List<String> 
newFiles,
+      MonitoredTask status) throws IOException {
+    ServerName rsServerName = task.getRsServerName();
+    RegionInfo regionInfo = task.getRegionInfo();
+    ColumnFamilyDescriptor cfd = task.getCfd();
+    List<String> selectedFileNames = task.getSelectedFileNames();
+    boolean newForceMajor = task.getStore().getForceMajor();
+    Builder builder =
+        
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+            
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+    // use file name only, dose not include path, because the size of protobuf 
is too big
+    for (String selectFile : selectedFileNames) {
+      builder.addSelectedFiles(selectFile);
+    }
+    for (String newFile : newFiles) {
+      builder.addNewFiles(newFile);
+    }
+    CompleteCompactionRequest completeCompactionRequest = builder.build();
+    AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+    try {
+      status
+          .setStatus("Report complete compaction to RS: " + rsServerName + ", 
selected file size: "
+              + selectedFileNames.size() + ", new file size: " + 
newFiles.size());
+      LOG.info("Report complete compaction: {}, selectedFileSize: {}, 
newFileSize: {}", task,
+        completeCompactionRequest.getSelectedFilesList().size(),
+        completeCompactionRequest.getNewFilesList().size());
+      CompleteCompactionResponse completeCompactionResponse =
+          
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+      if (completeCompactionResponse.getSuccess()) {
+        status.markComplete("Report to RS succeeded and RS accepted");
+        // move selected files to compacted files
+        compactionFilesCache.addCompactedFiles(regionInfo, cfd, 
selectedFileNames);
+        LOG.info("Compaction manager request complete compaction success. {}", 
task);
+      } else {
+        //TODO: maybe region is move, we need get latest regionserver name and 
retry
+        status.abort("Report to RS succeeded but RS denied");
+        LOG.warn("Compaction manager request complete compaction fail. {}", 
task);
+      }
+      return true;
+    } catch (IOException e) {
+      //TODO: rpc call broken, add retry
+      status.abort("Report to RS failed");
+      LOG.error("Compaction manager request complete compaction error. {}", 
task, e);
+      return false;
     }
-    return admin;
   }
 
-  public void requestCompaction() {
+  private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String> 
excludeFileNames) {
+    Collection<HStoreFile> storefiles = store.getStorefiles();
+    List<HStoreFile> storeFiles = new ArrayList<>();
+    for (HStoreFile storefile : storefiles) {
+      String name = storefile.getPath().getName();
+      if (excludeFileNames.contains(name)) {
+        storeFiles.add(storefile);
+      }
+    }
+    return storeFiles;
+  }
+
+  private HStore getStore(final Configuration conf, final FileSystem fs, final 
Path rootDir,
+      final TableDescriptor htd, final RegionInfo hri, final String 
familyName) throws IOException {
+    HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+        CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+    HRegion region = new HRegion(regionFs, null, conf, htd, null);
+    HStore store = new HStore(region, 
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+    OptionalLong maxSequenceId = store.getMaxSequenceId();
+    LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+    region.getMVCC().advanceTo(maxSequenceId.orElse(0));
+    return store;
+  }
+
+  private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) {
+    return server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+  }
+
+  ConcurrentHashMap<String, CompactionTask> getRunningCompactionTasks() {
+    return runningCompactionTasks;
+  }
+
+  void waitForStop() {
+    compactThreadControl.waitForStop();
+  }
+
+  private void executeCompaction(CompactionTask compactionTask) {
+    try {
+      String taskName = compactionTask.getRsServerName() + "-"
+          + compactionTask.getRegionInfo().getRegionNameAsString() + "-"
+          + compactionTask.getCfd().getNameAsString() + "-" + 
System.currentTimeMillis();
+      compactionTask.setTaskName(taskName);
+      runningCompactionTasks.put(compactionTask.getTaskName(), compactionTask);
+      doCompaction(compactionTask);
+    } catch (Throwable e) {
+      LOG.error("Execute compaction task error: {}", compactionTask, e);

Review comment:
       refer to the exception handle in Regionserver, We need increase 
compaction failed count and checkfilesystem is ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to