Apache9 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r662375310
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,441 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionServerStorage storage = new
CompactionServerStorage();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(), status,
logStr);
+ HStore store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ CompactionContext compactionContext = compaction.get();
+ // 2. update storage
+ Pair<Boolean, List<String>> updateStoreResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext,
status, logStr);
+ if (!updateStoreResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames = updateStoreResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ storage.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
storage");
+ Set<String> excludeFiles = new HashSet<>();
+ Set<String> compactingFiles = storage.getSelectedStoreFiles(regionInfo,
cfd);
+ synchronized (compactingFiles) {
Review comment:
It is a bit strange that we need to synchronized on a returned
collection, not a very good design I'd say...
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,483 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ // compaction pools
+ private volatile ThreadPoolExecutor longCompactions;
+ private volatile ThreadPoolExecutor shortCompactions;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private PressureAwareCompactionThroughputController throughputController;
+ private CompactionServerStorage storage = new CompactionServerStorage();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ // start compaction resources
+ this.throughputController = new
PressureAwareCompactionThroughputController();
+ this.throughputController.setConf(conf);
+ this.throughputController.setup(this);
+ startCompactionPool();
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ private void startCompactionPool() {
+ final String n = Thread.currentThread().getName();
+ // threads pool used to execute short and long compactions
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ StealJobQueue<Runnable> stealJobQueue =
+ new StealJobQueue<>(largeThreads, smallThreads,
COMPACTION_TASK_COMPARATOR);
+ this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d")
+ .setDaemon(true).build());
+ this.longCompactions.setRejectedExecutionHandler(new Rejection());
+ this.longCompactions.prestartAllCoreThreads();
+ this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+ .setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(), status,
logStr);
+ HStore store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ CompactionContext compactionContext = compaction.get();
+ // 2. update storage
+ Pair<Boolean, List<String>> updateStoreResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd, compactionContext,
status, logStr);
+ if (!updateStoreResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames = updateStoreResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
? longCompactions
+ : shortCompactions;
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ private Pair<HStore, Optional<CompactionContext>>
selectCompaction(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+ storage.cleanupCompactedFiles(regionInfo, cfd,
Review comment:
So I suppose this should be done by region server, not by compaction
server?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]