nyl3532016 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r662421646
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,441 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionServerStorage storage = new
CompactionServerStorage();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(), status,
logStr);
+ HStore store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ CompactionContext compactionContext = compaction.get();
+ // 2. update storage
+ Pair<Boolean, List<String>> updateStoreResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext,
status, logStr);
+ if (!updateStoreResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames = updateStoreResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ storage.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
storage");
+ Set<String> excludeFiles = new HashSet<>();
+ Set<String> compactingFiles = storage.getSelectedStoreFiles(regionInfo,
cfd);
+ synchronized (compactingFiles) {
Review comment:
Yes, I will modify it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]