This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new f19f92d HBASE-25991 Do compaction on compaction server (#3425)
f19f92d is described below
commit f19f92d297643357841f9c985c234a8af8e749ed
Author: niuyulin <[email protected]>
AuthorDate: Tue Jul 6 17:32:54 2021 +0800
HBASE-25991 Do compaction on compaction server (#3425)
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Michael Stack <[email protected]>
---
.../org/apache/hadoop/hbase/AbstractServer.java | 28 ++
.../hbase/compactionserver/CSRpcServices.java | 31 +-
.../compactionserver/CompactionFilesCache.java | 182 +++++++++
.../hbase/compactionserver/CompactionTask.java | 176 +++++++++
.../compactionserver/CompactionThreadManager.java | 433 ++++++++++++++++++++-
.../hbase/compactionserver/HCompactionServer.java | 46 ++-
.../compaction/CompactionOffloadManager.java | 17 +-
.../hadoop/hbase/regionserver/CompactSplit.java | 192 ++++-----
.../hbase/regionserver/CompactThreadControl.java | 156 ++++++++
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 38 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 2 +-
.../hbase/regionserver/RegionServerServices.java | 29 +-
.../CompactionThroughputControllerFactory.java | 5 +-
.../throttle/NoLimitThroughputController.java | 3 +-
...ressureAwareCompactionThroughputController.java | 5 +-
.../PressureAwareFlushThroughputController.java | 6 +-
.../PressureAwareThroughputController.java | 11 +-
.../throttle/ThroughputController.java | 5 +-
...oller.java => ThroughputControllerService.java} | 37 +-
.../compactionserver/TestCompactionFilesCache.java | 155 ++++++++
.../compactionserver/TestCompactionServer.java | 154 +++++++-
22 files changed, 1451 insertions(+), 262 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
index 208bd86..3e64c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
@@ -25,10 +25,12 @@ import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
@@ -56,6 +59,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
public abstract class AbstractServer extends Thread implements Server {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractServer.class);
protected Configuration conf;
+ protected volatile boolean dataFsOk;
+ protected HFileSystem dataFs;
// A sleeper that sleeps for msgInterval.
protected Sleeper sleeper;
protected int msgInterval;
@@ -170,6 +175,7 @@ public abstract class AbstractServer extends Thread
implements Server {
super(processName); // thread name
this.startcode = System.currentTimeMillis();
this.conf = conf;
+ this.dataFsOk = true;
this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
this.userProvider = UserProvider.instantiate(conf);
this.shortOperationTimeout =
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
@@ -400,6 +406,28 @@ public abstract class AbstractServer extends Thread
implements Server {
}
}
+ /**
+ * Checks to see if the file system is still accessible. If not, sets
abortRequested and
+ * stopRequested
+ * @return false if file system is not available
+ */
+ public boolean checkFileSystem() {
+ if (this.dataFsOk && this.dataFs != null) {
+ try {
+ FSUtils.checkFileSystemAvailable(this.dataFs);
+ } catch (IOException e) {
+ abort("File System not available", e);
+ this.dataFsOk = false;
+ }
+ }
+ return this.dataFsOk;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ return dataFs;
+ }
+
protected abstract AbstractRpcServices getRpcService();
protected abstract String getProcessName();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 2a76d7a..25cfa1a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractRpcServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,11 +36,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices
@@ -46,8 +51,6 @@ public class CSRpcServices extends AbstractRpcServices
private final HCompactionServer compactionServer;
- // Request counter.
- final LongAdder requestCount = new LongAdder();
/** RPC scheduler to use for the compaction server. */
public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.compaction.server.rpc.scheduler.factory.class";
@@ -86,11 +89,25 @@ public class CSRpcServices extends AbstractRpcServices
*/
@Override
public CompactResponse requestCompaction(RpcController controller,
- CompactionProtos.CompactRequest request) {
- requestCount.increment();
+ CompactionProtos.CompactRequest request) throws ServiceException {
+ compactionServer.requestCount.increment();
+ ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
+ RegionInfo regionInfo = ProtobufUtil.toRegionInfo(request.getRegionInfo());
+ ColumnFamilyDescriptor cfd =
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
+ boolean major = request.getMajor();
+ int priority = request.getPriority();
+ List<HBaseProtos.ServerName> favoredNodes =
Collections.singletonList(request.getServer());
LOG.info("Receive compaction request from {}",
ProtobufUtil.toString(request));
- compactionServer.compactionThreadManager.requestCompaction();
- return CompactionProtos.CompactResponse.newBuilder().build();
+ CompactionTask compactionTask =
+
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
+
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
+
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
+ try {
+
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
+ return CompactionProtos.CompactResponse.newBuilder().build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
new file mode 100644
index 0000000..5499cc8
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionFilesCache.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * Since we do not maintain StoreFileManager in compaction server(can't
refresh when flush). we use
+ * external storage(this class) to record compacting and compacted files. This
storage is in memory
+ * and only used by CompactionServer,(RegionServer not use it).This storage
never do hfile movement
+ * or deletion. RS does file movement still.
+ */
[email protected]
+class CompactionFilesCache {
+ private static Logger LOG =
LoggerFactory.getLogger(CompactionFilesCache.class);
+ private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>>
selectedFiles =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>>
compactedFiles =
+ new ConcurrentHashMap<>();
+ /**
+ * Mark files as completed, called after CS finished compaction and RS
accepted the results of
+ * this compaction, these compacted files will be deleted by RS if no reader
referenced to them.
+ */
+ boolean addCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+ List<String> compactedFiles) {
+ Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo,
cfd);
+ synchronized (compactedFileSet) {
+ compactedFileSet.addAll(compactedFiles);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Mark files as compacted, region: {}, cf, files: {}",
regionInfo,
+ cfd.getNameAsString(), compactedFileSet);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Mark files as selected, called after the files are selected and before
the compaction is
+ * started. Avoid a file is selected twice in two compaction.
+ * @return True if these files don't be selected, false if these files are
already selected.
+ */
+ boolean addSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+ List<String> selectedFiles) {
+ Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo,
cfd);
+ synchronized (selectedFileSet) {
+ for (String selectedFile : selectedFiles) {
+ if (selectedFileSet.contains(selectedFile)) {
+ return false;
+ }
+ }
+ selectedFileSet.addAll(selectedFiles);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Mark files are selected, region: {}, cf: {}, files: {}",
+ regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get files which are compacted, called before select files to do
compaction. Thread-safe
+ * @return The files which are compacted
+ */
+ Set<String> getCompactedStoreFiles(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
+ Set<String> compactedFiles = getStoreFiles(this.compactedFiles,
regionInfo, cfd);
+ synchronized (compactedFiles) {
+ return ImmutableSet.copyOf(compactedFiles);
+ }
+ }
+
+ /**
+ * Get files which are compacting, called before select files to do
compaction. Thread-safe
+ * @return The files which are compacting
+ */
+ Set<String> getSelectedStoreFiles(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
+ Set<String> selectedFiles = getStoreFiles(this.selectedFiles, regionInfo,
cfd);
+ synchronized (selectedFiles) {
+ return ImmutableSet.copyOf(selectedFiles);
+ }
+ }
+
+ /**
+ * Get files which are compacted, we use this return object as lock
+ * @return The files which are compacted as lock object
+ */
+ Object getCompactedStoreFilesAsLock(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
+ return getCompactedStoreFilesInternal(regionInfo, cfd);
+ }
+
+ /**
+ * Get files which are compacting, we use this return object as lock
+ * @return The files which are compacting as lock object
+ */
+ Object getSelectedStoreFilesAsLock(RegionInfo regionInfo,
ColumnFamilyDescriptor cfd) {
+ return getSelectedStoreFilesInternal(regionInfo, cfd);
+ }
+
+ /**
+ * Get files which are compacted, called before select files to do
compaction. No-thread-safe
+ * @return The files which are compacted
+ */
+ private Set<String> getCompactedStoreFilesInternal(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd) {
+ return getStoreFiles(this.compactedFiles, regionInfo, cfd);
+ }
+
+ /**
+ * Get files which are compacting, called before select files to do
compaction. No-thread-safe
+ * @return The files which are compacting
+ */
+ private Set<String> getSelectedStoreFilesInternal(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd) {
+ return getStoreFiles(this.selectedFiles, regionInfo, cfd);
+ }
+
+ private Set<String> getStoreFiles(
+ ConcurrentMap<String, ConcurrentMap<String, Set<String>>> fileMap,
RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd) {
+ String encodedName = regionInfo.getEncodedName();
+ String family = cfd.getNameAsString();
+ Map<String, Set<String>> familyFilesMap =
+ fileMap.computeIfAbsent(encodedName, v -> new ConcurrentHashMap<>());
+ return familyFilesMap.computeIfAbsent(family, v -> new HashSet<>());
+ }
+
+ /**
+ * Remove files from selected, called:
+ * 1. after the compaction is failed;
+ * 2. after the compaction is finished and report to RS failed;
+ * 3. after the compaction is finished and report to RS succeeded (and will
mark these files
+ * as compacted).
+ */
+ void removeSelectedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+ List<String> selectedFiles) {
+ Set<String> selectedFileSet = getSelectedStoreFilesInternal(regionInfo,
cfd);
+ synchronized (selectedFileSet) {
+ selectedFileSet.removeAll(selectedFiles);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove files from selected, region: {}, cf: {}, files: {}",
+ regionInfo.getEncodedName(), cfd.getNameAsString(), selectedFiles);
+ }
+ }
+ }
+
+ /**
+ * Remove compacted files which are already deleted by RS
+ */
+ void cleanupCompactedFiles(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
+ Set<String> storeFileNames) {
+ Set<String> compactedFileSet = getCompactedStoreFilesInternal(regionInfo,
cfd);
+ synchronized (compactedFileSet) {
+ compactedFileSet.retainAll(storeFileNames);
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
new file mode 100644
index 0000000..05e350f
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionTask.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * This class hold {@link CompactionContext} relate to CompactionRequest
+ */
[email protected]
+public final class CompactionTask {
+ private ServerName rsServerName;
+ private RegionInfo regionInfo;
+ private ColumnFamilyDescriptor cfd;
+ private CompactionContext compactionContext;
+ private HStore store;
+ private boolean requestMajor;
+ private int priority;
+ private List<String> selectedFileNames;
+ private MonitoredTask status;
+ private String taskName;
+ private List<HBaseProtos.ServerName> favoredNodes;
+ private long submitTime;
+
+ private CompactionTask() {
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private CompactionTask compactionTask = new CompactionTask();
+
+ Builder setRsServerName(ServerName rsServerName) {
+ compactionTask.rsServerName = rsServerName;
+ return this;
+ }
+
+ Builder setRegionInfo(RegionInfo regionInfo) {
+ compactionTask.regionInfo = regionInfo;
+ return this;
+ }
+
+ Builder setColumnFamilyDescriptor(ColumnFamilyDescriptor cfd) {
+ compactionTask.cfd = cfd;
+ return this;
+ }
+
+ Builder setRequestMajor(boolean requestMajor) {
+ compactionTask.requestMajor = requestMajor;
+ return this;
+ }
+
+ Builder setPriority(int priority) {
+ compactionTask.priority = priority;
+ return this;
+ }
+
+ Builder setFavoredNodes(List<HBaseProtos.ServerName> favoredNodes) {
+ compactionTask.favoredNodes = favoredNodes;
+ return this;
+ }
+
+ Builder setSubmitTime(long submitTime) {
+ compactionTask.submitTime = submitTime;
+ return this;
+ }
+
+ CompactionTask build() {
+ return compactionTask;
+ }
+ }
+
+ void setSelectedFileNames(List<String> selectedFileNames) {
+ this.selectedFileNames = selectedFileNames;
+ }
+
+ void setMonitoredTask(MonitoredTask status) {
+ this.status = status;
+ }
+
+ void setCompactionContext(CompactionContext compactionContext) {
+ this.compactionContext = compactionContext;
+ }
+
+ void setHStore(HStore store) {
+ this.store = store;
+ }
+
+ ServerName getRsServerName() {
+ return rsServerName;
+ }
+
+ RegionInfo getRegionInfo() {
+ return regionInfo;
+ }
+
+ ColumnFamilyDescriptor getCfd() {
+ return cfd;
+ }
+
+ CompactionContext getCompactionContext() {
+ return compactionContext;
+ }
+
+ HStore getStore() {
+ return store;
+ }
+
+ List<String> getSelectedFileNames() {
+ return selectedFileNames;
+ }
+
+ MonitoredTask getStatus() {
+ return status;
+ }
+
+ void setTaskName(String name) {
+ this.taskName = name;
+ }
+
+ String getTaskName() {
+ return taskName;
+ }
+
+ boolean isRequestMajor() {
+ return requestMajor;
+ }
+
+ int getPriority() {
+ return priority;
+ }
+
+ List<HBaseProtos.ServerName> getFavoredNodes() {
+ return favoredNodes;
+ }
+
+ long getSubmitTime() {
+ return submitTime;
+ }
+
+ void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
+ .append(regionInfo.getRegionNameAsString()).append(", CF:
").append(cfd.getNameAsString())
+ .append(", priority: ").append(priority).toString();
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index e7a5068..2a04e9e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,41 +18,447 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionFilesCache compactionFilesCache = new
CompactionFilesCache();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) throws
IOException {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ server.checkFileSystem();
+ // count the failed during selectFile
+ server.requestFailedCount.increment();
+ throw new IOException(e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ HStore store;
+ CompactionContext compactionContext;
+ Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+ // the synchronized ensure file in store, selected files in cache,
compacted files in cache,
+ // the three has consistent state, we need this condition to guarantee
correct selection
+ synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+ synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
+ store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ compactionContext = compaction.get();
+ // 2. update compactionFilesCache
+ updateSelectedFilesCacheResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd,
compactionContext, status, logStr);
+ } // end of synchronized selected files
+ } // end of synchronized compacted files
+ if (!updateSelectedFilesCacheResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames =
updateSelectedFilesCacheResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, server.getFileSystem(), rootDir,
+ tableDescriptors.get(regionInfo.getTable()), regionInfo,
cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
compactionFilesCache");
+ Set<String> compactingFiles =
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+ Set<String> compactedFiles =
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+ Set<String> excludeFiles = new HashSet<>(compactingFiles);
+ excludeFiles.addAll(compactedFiles);
+ // Convert files names to store files
+ status.setStatus("Convert current compacting and compacted files to store
files");
+ List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store,
excludeFiles);
+ LOG.info(
+ "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting:
{}, compacted: {}",
+ logStr, excludeFiles.size(), excludeStoreFiles.size(),
compactingFiles.size(),
+ compactedFiles.size());
+ status.setStatus("Select store files to compaction, major: " + major);
+ Optional<CompactionContext> compaction = store.selectCompaction(priority,
+ CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+ LOG.info("After select store: {}, if compaction context is present: {}",
logStr,
+ compaction.isPresent());
+ return new Pair<>(store, compaction);
+ }
+
+ /**
+ * Mark files in compaction context as selected in compactionFilesCache
+ * @return True if success, otherwise if files are already in selected
compactionFilesCache
+ */
+ private Pair<Boolean, List<String>>
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd, CompactionContext compactionContext,
MonitoredTask status,
+ String logStr) {
+ LOG.info("Start update compactionFilesCache after select compaction: {}",
logStr);
+ // save selected files to compactionFilesCache
+ List<String> selectedFilesNames = new ArrayList<>();
+ for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+ selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+ }
+ if (compactionFilesCache.addSelectedFiles(regionInfo, cfd,
selectedFilesNames)) {
+ LOG.info("Update compactionFilesCache after select compaction success:
{}", logStr);
+ status.setStatus("Update compactionFilesCache after select compaction
success");
+ return new Pair<>(Boolean.TRUE, selectedFilesNames);
+ } else {
+ //should not happen
+ LOG.info("selected files are already in store and return: {}", logStr);
+ status.abort("Selected files are already in compactionFilesCache and
return");
+ return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+ }
+ }
+
+ /**
+ * Execute compaction in the process of compaction server
+ */
+ private void doCompaction(CompactionTask compactionTask) throws IOException {
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ HStore store = compactionTask.getStore();
+ CompactionContext compactionContext =
compactionTask.getCompactionContext();
+ List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+ MonitoredTask status = compactionTask.getStatus();
+ try {
+ LOG.info("Start compact store: {}, cf: {}, compaction context: {}",
store, cfd,
+ compactionContext);
+ List<Path> newFiles =
+
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
null);
+ LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd,
newFiles);
+ List<String> newFileNames = new ArrayList<>();
+ for (Path newFile : newFiles) {
+ newFileNames.add(newFile.getName());
+ }
+ reportCompactionCompleted(compactionTask, newFileNames, status);
+ } finally {
+ status.setStatus("Remove selected files");
+ LOG.info("Remove selected files: {}", compactionTask);
+ compactionFilesCache.removeSelectedFiles(regionInfo, cfd,
selectedFileNames);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ /**
+ * Report compaction completed to RS
+ */
+ private void reportCompactionCompleted(CompactionTask task, List<String>
newFiles,
+ MonitoredTask status) {
+ ServerName rsServerName = task.getRsServerName();
+ RegionInfo regionInfo = task.getRegionInfo();
+ ColumnFamilyDescriptor cfd = task.getCfd();
+ List<String> selectedFileNames = task.getSelectedFileNames();
+ boolean newForceMajor = task.getStore().getForceMajor();
+ Builder builder =
+
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+ // use file name only, dose not include path, because the size of protobuf
is too big
+ for (String selectFile : selectedFileNames) {
+ builder.addSelectedFiles(selectFile);
+ }
+ for (String newFile : newFiles) {
+ builder.addNewFiles(newFile);
+ }
+ CompleteCompactionRequest completeCompactionRequest = builder.build();
+ AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+ try {
+ status
+ .setStatus("Report complete compaction to RS: " + rsServerName + ",
selected file size: "
+ + selectedFileNames.size() + ", new file size: " +
newFiles.size());
+ LOG.info("Report complete compaction: {}, selectedFileSize: {},
newFileSize: {}", task,
+ completeCompactionRequest.getSelectedFilesList().size(),
+ completeCompactionRequest.getNewFilesList().size());
+ CompleteCompactionResponse completeCompactionResponse =
+
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+ if (completeCompactionResponse.getSuccess()) {
+ status.markComplete("Report to RS succeeded and RS accepted");
+ // move selected files to compacted files
+ compactionFilesCache.addCompactedFiles(regionInfo, cfd,
selectedFileNames);
+ LOG.info("Compaction manager request complete compaction success. {}",
task);
+ } else {
+ //TODO: maybe region is move, we need get latest regionserver name and
retry
+ status.abort("Report to RS succeeded but RS denied");
+ LOG.warn("Compaction manager request complete compaction fail. {}",
task);
+ }
+ } catch (IOException e) {
+ //TODO: rpc call broken, add retry
+ status.abort("Report to RS failed");
+ LOG.error("Compaction manager request complete compaction error. {}",
task, e);
}
- return admin;
}
- public void requestCompaction() {
+ private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String>
excludeFileNames) {
+ Collection<HStoreFile> storefiles = store.getStorefiles();
+ List<HStoreFile> storeFiles = new ArrayList<>();
+ for (HStoreFile storefile : storefiles) {
+ String name = storefile.getPath().getName();
+ if (excludeFileNames.contains(name)) {
+ storeFiles.add(storefile);
+ }
+ }
+ return storeFiles;
+ }
+
+ private HStore getStore(final Configuration conf, final FileSystem fs, final
Path rootDir,
+ final TableDescriptor htd, final RegionInfo hri, final String
familyName) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+ CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+ HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ HStore store = new HStore(region,
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+ OptionalLong maxSequenceId = store.getMaxSequenceId();
+ LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+ region.getMVCC().advanceTo(maxSequenceId.orElse(0));
+ return store;
+ }
+
+ private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) {
+ return server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+ }
+
+ ConcurrentHashMap<String, CompactionTask> getRunningCompactionTasks() {
+ return runningCompactionTasks;
+ }
+
+ void waitForStop() {
+ compactThreadControl.waitForStop();
+ }
+
+ private void executeCompaction(CompactionTask compactionTask) {
+ try {
+ String taskName = compactionTask.getRsServerName() + "-"
+ + compactionTask.getRegionInfo().getRegionNameAsString() + "-"
+ + compactionTask.getCfd().getNameAsString() + "-" +
System.currentTimeMillis();
+ compactionTask.setTaskName(taskName);
+ runningCompactionTasks.put(compactionTask.getTaskName(), compactionTask);
+ doCompaction(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Execute compaction task error: {}", compactionTask, e);
+ server.checkFileSystem();
+ //count the failed during do compaction
+ server.requestFailedCount.increment();
+ } finally {
+ runningCompactionTasks.remove(compactionTask.getTaskName());
+ if (compactionTask.getStore() != null) {
+ try {
+ compactionTask.getStore().close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close store: {}", compactionTask, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private static BiConsumer<Runnable, ThreadPoolExecutor> REJECTION =
(runnable, pool) -> {
+ {
+ if (runnable instanceof CompactionTaskRunner) {
+ CompactionTaskRunner runner = (CompactionTaskRunner) runnable;
+ LOG.info("Compaction Rejected: " + runner);
+ CompactionTask task = runner.getCompactionTask();
+ if (task != null) {
+ compactionFilesCache.removeSelectedFiles(task.getRegionInfo(),
task.getCfd(),
+ task.getSelectedFileNames());
+ }
+ }
+ }
+ };
+
+ protected class CompactionTaskRunner implements Runnable {
+ private CompactionTask compactionTask;
+
+ CompactionTaskRunner(CompactionTask compactionTask) {
+ this.compactionTask = compactionTask;
+ }
+
+ @Override
+ public void run() {
+ executeCompaction(compactionTask);
+ }
+
+ CompactionTask getCompactionTask() {
+ return compactionTask;
+ }
}
+ private static final Comparator<Runnable> COMPACTION_TASK_COMPARATOR =
+ (Runnable r1, Runnable r2) -> {
+ // CompactionRunner first
+ if (r1 instanceof CompactionTaskRunner) {
+ if (!(r2 instanceof CompactionTaskRunner)) {
+ return -1;
+ }
+ } else {
+ if (r2 instanceof CompactionTaskRunner) {
+ return 1;
+ } else {
+ // break the tie based on hash code
+ return System.identityHashCode(r1) - System.identityHashCode(r2);
+ }
+ }
+ CompactionTask o1 = ((CompactionTaskRunner) r1).getCompactionTask();
+ CompactionTask o2 = ((CompactionTaskRunner) r2).getCompactionTask();
+ int cmp = Integer.compare(o1.getPriority(), o2.getPriority());
+ if (cmp == 0) {
+ return Long.compare(o1.getSubmitTime(), o2.getSubmitTime());
+ }
+ return cmp;
+ };
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index cd09480..f78e58a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractServer;
@@ -27,7 +29,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -55,6 +59,11 @@ public class HCompactionServer extends AbstractServer {
/** compaction server process name */
public static final String COMPACTIONSERVER = "compactionserver";
private static final Logger LOG =
LoggerFactory.getLogger(HCompactionServer.class);
+ // Request counter.
+ final LongAdder requestCount = new LongAdder();
+ final LongAdder requestFailedCount = new LongAdder();
+ // ChoreService used to schedule tasks that we want to run periodically
+ private ChoreService choreService;
@Override
protected String getProcessName() {
@@ -68,14 +77,13 @@ public class HCompactionServer extends AbstractServer {
@Override
public ChoreService getChoreService() {
- return null;
+ return choreService;
}
protected final CSRpcServices rpcServices;
// Stub to do compaction server status calls against the master.
private volatile CompactionServerStatusService.BlockingInterface cssStub;
-
CompactionThreadManager compactionThreadManager;
/**
* Get the current master from ZooKeeper and open the RPC connection to it.
To get a fresh
@@ -120,12 +128,14 @@ public class HCompactionServer extends AbstractServer {
// login the server principal (if using secure Hadoop)
login(userProvider, this.rpcServices.getIsa().getHostName());
Superusers.initialize(conf);
+ this.dataFs = new HFileSystem(this.conf, true);
+ this.choreService = new ChoreService(getName(), true);
this.compactionThreadManager = new CompactionThreadManager(conf, this);
this.rpcServices.start();
}
@Override
- protected CSRpcServices getRpcService(){
+ protected CSRpcServices getRpcService() {
return rpcServices;
}
@@ -138,9 +148,20 @@ public class HCompactionServer extends AbstractServer {
long reportEndTime) {
ClusterStatusProtos.CompactionServerLoad.Builder serverLoad =
ClusterStatusProtos.CompactionServerLoad.newBuilder();
- serverLoad.setCompactedCells(0);
- serverLoad.setCompactingCells(0);
- serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+ Collection<CompactionTask> tasks =
compactionThreadManager.getRunningCompactionTasks().values();
+ long compactingCells = 0;
+ long compactedCells = 0;
+ for (CompactionTask compactionTask : tasks) {
+ serverLoad.addCompactionTasks(compactionTask.getTaskName());
+ CompactionProgress progress =
compactionTask.getStore().getCompactionProgress();
+ if (progress != null) {
+ compactedCells += progress.getCurrentCompactedKvs();
+ compactingCells += progress.getTotalCompactingKVs();
+ }
+ }
+ serverLoad.setCompactedCells(compactedCells);
+ serverLoad.setCompactingCells(compactingCells);
+ serverLoad.setTotalNumberOfRequests(requestCount.sum());
serverLoad.setReportStartTime(reportStartTime);
serverLoad.setReportEndTime(reportEndTime);
return serverLoad.build();
@@ -209,7 +230,7 @@ public class HCompactionServer extends AbstractServer {
// We registered with the Master. Go into run mode.
long lastMsg = System.currentTimeMillis();
// The main run loop.
- while (!isStopped()) {
+ while (!isStopped() && this.dataFsOk) {
long now = System.currentTimeMillis();
if ((now - lastMsg) >= msgInterval) {
if (tryCompactionServerReport(lastMsg, now) && !online.get()) {
@@ -231,6 +252,16 @@ public class HCompactionServer extends AbstractServer {
abort(prefix + t.getMessage(), t);
}
}
+ stopChores();
+ if (this.compactionThreadManager != null) {
+ this.compactionThreadManager.waitForStop();
+ }
+ }
+
+ private void stopChores() {
+ if (this.choreService != null) {
+ choreService.shutdown();
+ }
}
@Override
@@ -244,7 +275,6 @@ public class HCompactionServer extends AbstractServer {
stop(msg);
}
-
@Override
public void stop(final String msg) {
if (!this.stopped) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
index b0e80d1..a88fc73 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CompactionServerMetrics;
@@ -59,8 +57,6 @@ public class CompactionOffloadManager {
private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage;
private static final Logger LOG =
LoggerFactory.getLogger(CompactionOffloadManager.class.getName());
- private final ConcurrentMap<ServerName, AsyncCompactionServerService>
csStubs =
- new ConcurrentHashMap<>();
public CompactionOffloadManager(final MasterServices master) {
this.masterServices = master;
@@ -164,14 +160,8 @@ public class CompactionOffloadManager {
return compactionServerList.get((int) index);
}
- private AsyncCompactionServerService getCsStub(final ServerName sn) throws
IOException {
- AsyncCompactionServerService csStub = this.csStubs.get(sn);
- if (csStub == null) {
- LOG.debug("New CS stub connection to {}", sn);
- csStub =
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
- this.csStubs.put(sn, csStub);
- }
- return csStub;
+ private AsyncCompactionServerService getCsStub(final ServerName sn) {
+ return
this.masterServices.getAsyncClusterConnection().getCompactionServerService(sn);
}
public CompactResponse requestCompaction(CompactRequest request) throws
ServiceException {
@@ -180,9 +170,10 @@ public class CompactionOffloadManager {
ProtobufUtil.toString(request), targetCompactionServer);
try {
FutureUtils.get(getCsStub(targetCompactionServer).requestCompaction(request));
+ return CompactResponse.newBuilder().build();
} catch (Throwable t) {
LOG.error("requestCompaction from master to CS error: {}", t);
+ throw new ServiceException(t);
}
- return null;
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 441b18b..9c431f9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -30,10 +30,10 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.IntSupplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -48,7 +48,6 @@ import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -87,12 +86,9 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
private final HRegionServer server;
private final Configuration conf;
- private volatile ThreadPoolExecutor longCompactions;
- private volatile ThreadPoolExecutor shortCompactions;
+ private final CompactThreadControl compactThreadControl;
private volatile ThreadPoolExecutor splits;
- private volatile ThroughputController compactionThroughputController;
-
private volatile boolean compactionsEnabled;
/**
* Splitting should not take place if the total number of regions exceed
this.
@@ -105,12 +101,14 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
this.server = server;
this.conf = server.getConfiguration();
this.compactionsEnabled =
this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
- createCompactionExecutors();
+ this.regionSplitLimit =
+ conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl =
+ new CompactThreadControl(server, largeThreads, smallThreads,
COMPARATOR, REJECTION);
createSplitExcecutors();
-
- // compaction throughput controller
- this.compactionThroughputController =
- CompactionThroughputControllerFactory.create(server, conf);
}
private void createSplitExcecutors() {
@@ -120,65 +118,18 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
new ThreadFactoryBuilder().setNameFormat(n +
"-splits-%d").setDaemon(true).build());
}
- private void createCompactionExecutors() {
- this.regionSplitLimit =
- conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
-
- int largeThreads =
- Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
- int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
-
- // if we have throttle threads, make sure the user also specified size
- Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
-
- final String n = Thread.currentThread().getName();
-
- StealJobQueue<Runnable> stealJobQueue = new
StealJobQueue<Runnable>(COMPARATOR);
- this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
- stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d")
- .setDaemon(true).build());
- this.longCompactions.setRejectedExecutionHandler(new Rejection());
- this.longCompactions.prestartAllCoreThreads();
- this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
- stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
- .setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
- this.shortCompactions.setRejectedExecutionHandler(new Rejection());
- }
-
@Override
public String toString() {
- return "compactionQueue=(longCompactions="
- + longCompactions.getQueue().size() + ":shortCompactions="
- + shortCompactions.getQueue().size() + ")"
+ return compactThreadControl.toString()
+ ", splitQueue=" + splits.getQueue().size();
}
public String dumpQueue() {
- StringBuilder queueLists = new StringBuilder();
- queueLists.append("Compaction/Split Queue dump:\n");
- queueLists.append(" LargeCompation Queue:\n");
- BlockingQueue<Runnable> lq = longCompactions.getQueue();
- Iterator<Runnable> it = lq.iterator();
- while (it.hasNext()) {
- queueLists.append(" " + it.next().toString());
- queueLists.append("\n");
- }
-
- if (shortCompactions != null) {
- queueLists.append("\n");
- queueLists.append(" SmallCompation Queue:\n");
- lq = shortCompactions.getQueue();
- it = lq.iterator();
- while (it.hasNext()) {
- queueLists.append(" " + it.next().toString());
- queueLists.append("\n");
- }
- }
-
+ StringBuilder queueLists = compactThreadControl.dumpQueue();
queueLists.append("\n");
queueLists.append(" Split Queue:\n");
- lq = splits.getQueue();
- it = lq.iterator();
+ BlockingQueue<Runnable> lq = splits.getQueue();
+ Iterator<Runnable> it = lq.iterator();
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
@@ -230,12 +181,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
private void interrupt() {
- longCompactions.shutdownNow();
- shortCompactions.shutdownNow();
+ compactThreadControl.getLongCompactions().shutdownNow();
+ compactThreadControl.getShortCompactions().shutdownNow();
}
private void reInitializeCompactionsExecutors() {
- createCompactionExecutors();
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl.createCompactionExecutors(largeThreads, smallThreads,
COMPARATOR);
}
private interface CompactionCompleteTracker {
@@ -355,18 +309,19 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
if (selectNow) {
// compaction.get is safe as we will just return if selectNow is true
but no compaction is
// selected
- pool = store.throttleCompaction(compaction.getRequest().getSize()) ?
longCompactions
- : shortCompactions;
+ pool = store.throttleCompaction(compaction.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
} else {
// We assume that most compactions are small. So, put system compactions
into small
// pool; we will do selection there, and move to large pool if necessary.
- pool = shortCompactions;
+ pool = compactThreadControl.getShortCompactions();
}
pool.execute(
new CompactionRunner(store, region, compaction, tracker,
completeTracker, pool, user));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
- String type = (pool == shortCompactions) ? "Small " : "Large ";
+ String type = (pool == compactThreadControl.getShortCompactions()) ?
"Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ?
compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; "
+ this);
}
@@ -407,8 +362,8 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
*/
void interruptIfNecessary() {
splits.shutdown();
- longCompactions.shutdown();
- shortCompactions.shutdown();
+ compactThreadControl.getLongCompactions().shutdown();
+ compactThreadControl.getShortCompactions().shutdown();
}
private void waitFor(ThreadPoolExecutor t, String name) {
@@ -429,8 +384,7 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
void join() {
waitFor(splits, "Split Thread");
- waitFor(longCompactions, "Large Compaction Thread");
- waitFor(shortCompactions, "Small Compaction Thread");
+ compactThreadControl.waitForStop();
}
/**
@@ -440,16 +394,17 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
* @return The current size of the regions queue.
*/
public int getCompactionQueueSize() {
- return longCompactions.getQueue().size() +
shortCompactions.getQueue().size();
+ return compactThreadControl.getLongCompactions().getQueue().size()
+ + compactThreadControl.getShortCompactions().getQueue().size();
}
public int getLargeCompactionQueueSize() {
- return longCompactions.getQueue().size();
+ return compactThreadControl.getLongCompactions().getQueue().size();
}
public int getSmallCompactionQueueSize() {
- return shortCompactions.getQueue().size();
+ return compactThreadControl.getShortCompactions().getQueue().size();
}
public int getSplitQueueSize() {
@@ -591,12 +546,14 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
// Now see if we are in correct pool for the size; if not, go to the
correct one.
// We might end up waiting for a while, so cancel the selection.
- ThreadPoolExecutor pool =
- store.throttleCompaction(c.getRequest().getSize()) ?
longCompactions : shortCompactions;
+ ThreadPoolExecutor pool =
store.throttleCompaction(c.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
// Long compaction pool can process small job
// Short compaction pool should not process large job
- if (this.parent == shortCompactions && pool == longCompactions) {
+ if (this.parent == compactThreadControl.getShortCompactions()
+ && pool == compactThreadControl.getLongCompactions()) {
this.store.cancelRequestedCompaction(c);
this.parent = pool;
this.parent.execute(this);
@@ -611,13 +568,13 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
tracker.beforeExecution(store);
try {
// Note: please don't put single-compaction logic here;
- // put it into region/store/etc. This is CST logic.
+ // put it into region/store/etc. This is CST logic.
long start = EnvironmentEdgeManager.currentTime();
- boolean completed =
- region.compact(c, store, compactionThroughputController, user);
+ boolean completed = region.compact(c, store,
+ compactThreadControl.getCompactionThroughputController(), user);
long now = EnvironmentEdgeManager.currentTime();
- LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
- this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+ LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
this + "; duration="
+ + StringUtils.formatTimeDiff(now, start));
if (completed) {
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
@@ -671,18 +628,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
/**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
- private static class Rejection implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
- if (runnable instanceof CompactionRunner) {
- CompactionRunner runner = (CompactionRunner) runnable;
- LOG.debug("Compaction Rejected: " + runner);
- if (runner.compaction != null) {
- runner.store.cancelRequestedCompaction(runner.compaction);
- }
+ private static BiConsumer<Runnable, ThreadPoolExecutor> REJECTION =
(runnable, pool) -> {
+ if (runnable instanceof CompactionRunner) {
+ CompactionRunner runner = (CompactionRunner) runnable;
+ LOG.debug("Compaction Rejected: " + runner);
+ if (runner.compaction != null) {
+ runner.store.cancelRequestedCompaction(runner.compaction);
}
}
- }
+ };
/**
* {@inheritDoc}
@@ -697,31 +651,31 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
int largeThreads = Math.max(1, newConf.getInt(
LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
- if (this.longCompactions.getCorePoolSize() != largeThreads) {
+ if (compactThreadControl.getLongCompactions().getCorePoolSize() !=
largeThreads) {
LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
- " from " + this.longCompactions.getCorePoolSize() + " to " +
+ " from " +
compactThreadControl.getLongCompactions().getCorePoolSize() + " to " +
largeThreads);
- if(this.longCompactions.getCorePoolSize() < largeThreads) {
- this.longCompactions.setMaximumPoolSize(largeThreads);
- this.longCompactions.setCorePoolSize(largeThreads);
+ if(compactThreadControl.getLongCompactions().getCorePoolSize() <
largeThreads) {
+
compactThreadControl.getLongCompactions().setMaximumPoolSize(largeThreads);
+
compactThreadControl.getLongCompactions().setCorePoolSize(largeThreads);
} else {
- this.longCompactions.setCorePoolSize(largeThreads);
- this.longCompactions.setMaximumPoolSize(largeThreads);
+
compactThreadControl.getLongCompactions().setCorePoolSize(largeThreads);
+
compactThreadControl.getLongCompactions().setMaximumPoolSize(largeThreads);
}
}
int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
- if (this.shortCompactions.getCorePoolSize() != smallThreads) {
+ if (compactThreadControl.getShortCompactions().getCorePoolSize() !=
smallThreads) {
LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
- " from " + this.shortCompactions.getCorePoolSize() + " to " +
+ " from " +
compactThreadControl.getShortCompactions().getCorePoolSize() + " to " +
smallThreads);
- if(this.shortCompactions.getCorePoolSize() < smallThreads) {
- this.shortCompactions.setMaximumPoolSize(smallThreads);
- this.shortCompactions.setCorePoolSize(smallThreads);
+ if(compactThreadControl.getShortCompactions().getCorePoolSize() <
smallThreads) {
+
compactThreadControl.getShortCompactions().setMaximumPoolSize(smallThreads);
+
compactThreadControl.getShortCompactions().setCorePoolSize(smallThreads);
} else {
- this.shortCompactions.setCorePoolSize(smallThreads);
- this.shortCompactions.setMaximumPoolSize(smallThreads);
+
compactThreadControl.getShortCompactions().setCorePoolSize(smallThreads);
+
compactThreadControl.getShortCompactions().setMaximumPoolSize(smallThreads);
}
}
@@ -740,12 +694,12 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
}
- ThroughputController old = this.compactionThroughputController;
+ ThroughputController old =
compactThreadControl.getCompactionThroughputController();
if (old != null) {
old.stop("configuration change");
}
- this.compactionThroughputController =
- CompactionThroughputControllerFactory.create(server, newConf);
+ compactThreadControl.setCompactionThroughputController(
+ CompactionThroughputControllerFactory.create(server, newConf));
// We change this atomically here instead of reloading the config in order
that upstream
// would be the only one with the flexibility to reload the config.
@@ -753,11 +707,11 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
protected int getSmallCompactionThreadNum() {
- return this.shortCompactions.getCorePoolSize();
+ return compactThreadControl.getShortCompactions().getCorePoolSize();
}
protected int getLargeCompactionThreadNum() {
- return this.longCompactions.getCorePoolSize();
+ return compactThreadControl.getLongCompactions().getCorePoolSize();
}
protected int getSplitThreadNum() {
@@ -781,7 +735,7 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
}
public ThroughputController getCompactionThroughputController() {
- return compactionThroughputController;
+ return compactThreadControl.getCompactionThroughputController();
}
/**
@@ -790,15 +744,15 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
* from short compaction queue
*/
void shutdownLongCompactions(){
- this.longCompactions.shutdown();
+ compactThreadControl.getLongCompactions().shutdown();
}
public void clearLongCompactionsQueue() {
- longCompactions.getQueue().clear();
+ compactThreadControl.getLongCompactions().getQueue().clear();
}
public void clearShortCompactionsQueue() {
- shortCompactions.getQueue().clear();
+ compactThreadControl.getShortCompactions().getQueue().clear();
}
public boolean isCompactionsEnabled() {
@@ -814,14 +768,14 @@ public class CompactSplit implements CompactionRequester,
PropagatingConfigurati
* @return the longCompactions thread pool executor
*/
ThreadPoolExecutor getLongCompactions() {
- return longCompactions;
+ return compactThreadControl.getLongCompactions();
}
/**
* @return the shortCompactions thread pool executor
*/
ThreadPoolExecutor getShortCompactions() {
- return shortCompactions;
+ return compactThreadControl.getShortCompactions();
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
new file mode 100644
index 0000000..7364771
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
[email protected]
+public class CompactThreadControl {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactThreadControl.class);
+ private volatile ThreadPoolExecutor longCompactions;
+ private volatile ThreadPoolExecutor shortCompactions;
+ private volatile ThroughputController compactionThroughputController;
+ private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+ public CompactThreadControl(ThroughputControllerService server, int
largeThreads,
+ int smallThreads, Comparator<Runnable> cmp,
+ BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+ createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+ // compaction throughput controller
+ this.compactionThroughputController =
+ CompactionThroughputControllerFactory.create(server,
server.getConfiguration());
+ // compaction throughput controller
+ this.rejection = rejection;
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private class Rejection implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+ rejection.accept(runnable, pool);
+ }
+ }
+
+ void createCompactionExecutors(int largeThreads, int smallThreads,
Comparator<Runnable> cmp) {
+ // if we have throttle threads, make sure the user also specified size
+ Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+ final String n = Thread.currentThread().getName();
+
+ StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+ this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d")
+ .setDaemon(true).build());
+ this.longCompactions.setRejectedExecutionHandler(new Rejection());
+ this.longCompactions.prestartAllCoreThreads();
+ this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+ .setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+ }
+
+ @Override
+ public String toString() {
+ return "compactionQueue=(longCompactions=" +
longCompactions.getQueue().size()
+ + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+ }
+
+ public StringBuilder dumpQueue() {
+ StringBuilder queueLists = new StringBuilder();
+ queueLists.append("Compaction/Split Queue dump:\n");
+ queueLists.append(" LargeCompation Queue:\n");
+ BlockingQueue<Runnable> lq = longCompactions.getQueue();
+ Iterator<Runnable> it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+
+ if (shortCompactions != null) {
+ queueLists.append("\n");
+ queueLists.append(" SmallCompation Queue:\n");
+ lq = shortCompactions.getQueue();
+ it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+ }
+ return queueLists;
+ }
+
+ public ThreadPoolExecutor getLongCompactions() {
+ return longCompactions;
+ }
+
+ public ThreadPoolExecutor getShortCompactions() {
+ return shortCompactions;
+ }
+
+ void setCompactionThroughputController(ThroughputController
compactionThroughputController) {
+ this.compactionThroughputController = compactionThroughputController;
+ }
+
+ public ThroughputController getCompactionThroughputController() {
+ return compactionThroughputController;
+ }
+
+ public void waitForStop() {
+ waitForPoolStop(longCompactions, "Large Compaction Thread");
+ waitForPoolStop(shortCompactions, "Small Compaction Thread");
+ }
+
+ private void waitForPoolStop(ThreadPoolExecutor t, String name) {
+ if (t == null) {
+ return;
+ }
+ try {
+ t.shutdown();
+ t.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted waiting for " + name + " to finish...");
+ Thread.currentThread().interrupt();
+ t.shutdownNow();
+ }
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 73234f1..cc34c6d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -100,7 +100,7 @@ public class HRegionFileSystem {
* @param tableDir {@link Path} to where the table is being stored
* @param regionInfo {@link RegionInfo} for region
*/
- HRegionFileSystem(final Configuration conf, final FileSystem fs, final Path
tableDir,
+ public HRegionFileSystem(final Configuration conf, final FileSystem fs,
final Path tableDir,
final RegionInfo regionInfo) {
this.fs = fs;
this.conf = conf;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e49ad87..b2df9b7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -327,8 +327,6 @@ public class HRegionServer extends AbstractServer implements
// Instance of the hbase executor executorService.
protected ExecutorService executorService;
- private volatile boolean dataFsOk;
- private HFileSystem dataFs;
private HFileSystem walFs;
// Go down hard. Used if file system becomes unavailable and also in
@@ -346,7 +344,6 @@ public class HRegionServer extends AbstractServer implements
private volatile boolean killed = false;
private volatile boolean shutDown = false;
-
private Path dataRootDir;
private Path walRootDir;
@@ -533,7 +530,6 @@ public class HRegionServer extends AbstractServer implements
super(conf, "RegionServer"); // thread name
TraceUtil.initTracer(conf);
try {
- this.dataFsOk = true;
this.eventLoopGroupConfig = setupNetty(this.conf);
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
@@ -2836,11 +2832,6 @@ public class HRegionServer extends AbstractServer
implements
return dataRootDir;
}
- @Override
- public FileSystem getFileSystem() {
- return dataFs;
- }
-
/**
* @return {@code true} when the data file system is available, {@code
false} otherwise.
*/
@@ -3279,24 +3270,6 @@ public class HRegionServer extends AbstractServer
implements
|| msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
}
- /**
- * Checks to see if the file system is still accessible. If not, sets
- * abortRequested and stopRequested
- *
- * @return false if file system is not available
- */
- boolean checkFileSystem() {
- if (this.dataFsOk && this.dataFs != null) {
- try {
- FSUtils.checkFileSystemAvailable(this.dataFs);
- } catch (IOException e) {
- abort("File System not available", e);
- this.dataFsOk = false;
- }
- }
- return this.dataFsOk;
- }
-
@Override
public void updateRegionFavoredNodesMapping(String encodedRegionName,
List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName>
favoredNodes) {
@@ -3503,6 +3476,12 @@ public class HRegionServer extends AbstractServer
implements
.build();
}
+ /**
+ * @return the max compaction pressure of all stores on this regionserver.
The value should be
+ * greater than or equal to 0.0, and any value greater than 1.0
means we enter the
+ * emergency state that some stores have too many store files.
+ * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
+ */
@Override
public double getCompactionPressure() {
double max = 0;
@@ -3540,6 +3519,11 @@ public class HRegionServer extends AbstractServer
implements
return flushThroughputController;
}
+ /**
+ * @return the flush pressure of all stores on this regionserver. The value
should be greater than
+ * or equal to 0.0, and any value greater than 1.0 means we enter
the emergency state that
+ * global memstore size already exceeds lower limit.
+ */
@Override
public double getFlushPressure() {
if (getRegionServerAccounting() == null || cacheFlusher == null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9749576..7dc4c6e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -240,7 +240,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
* @param family HColumnDescriptor for this column
* @param confParam configuration object failed. Can be null.
*/
- protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
+ public HStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException {
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c718f15..1f51fe8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -40,6 +40,7 @@ import
org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.wal.WAL;
@@ -56,14 +57,16 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* the code base.
*/
@InterfaceAudience.Private
-public interface RegionServerServices extends Server, MutableOnlineRegions,
FavoredNodesForRegion {
+public interface RegionServerServices
+ extends Server, MutableOnlineRegions, FavoredNodesForRegion,
ThroughputControllerService {
- /** @return the WAL for a particular region. Pass null for getting the
- * default (common) WAL */
+ /**
+ * @return the WAL for a particular region. Pass null for getting the
default (common) WAL
+ */
WAL getWAL(RegionInfo regionInfo) throws IOException;
- /** @return the List of WALs that are used by this server
- * Doesn't include the meta WAL
+ /**
+ * @return the List of WALs that are used by this server Doesn't include the
meta WAL
*/
List<WAL> getWALs() throws IOException;
@@ -229,27 +232,11 @@ public interface RegionServerServices extends Server,
MutableOnlineRegions, Favo
HeapMemoryManager getHeapMemoryManager();
/**
- * @return the max compaction pressure of all stores on this regionserver.
The value should be
- * greater than or equal to 0.0, and any value greater than 1.0
means we enter the
- * emergency state that some stores have too many store files.
- * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
- */
- double getCompactionPressure();
-
- /**
* @return the controller to avoid flush too fast
*/
ThroughputController getFlushThroughputController();
/**
- * @return the flush pressure of all stores on this regionserver. The value
should be greater than
- * or equal to 0.0, and any value greater than 1.0 means we enter
the emergency state that
- * global memstore size already exceeds lower limit.
- */
- @Deprecated
- double getFlushPressure();
-
- /**
* @return the metrics tracker for the region server
*/
MetricsRegionServer getMetrics();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
index 45e7267..df6be3f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public final class CompactionThroughputControllerFactory {
@@ -45,7 +44,7 @@ public final class CompactionThroughputControllerFactory {
private static final String
DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS =
"org.apache.hadoop.hbase.regionserver.compactions.NoLimitThroughputController";
- public static ThroughputController create(RegionServerServices server,
+ public static ThroughputController create(ThroughputControllerService server,
Configuration conf) {
Class<? extends ThroughputController> clazz =
getThroughputControllerClass(conf);
ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
index 4b1b261..c14e514 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class NoLimitThroughputController implements ThroughputController {
@@ -27,7 +26,7 @@ public class NoLimitThroughputController implements
ThroughputController {
public static final NoLimitThroughputController INSTANCE = new
NoLimitThroughputController();
@Override
- public void setup(RegionServerServices server) {
+ public void setup(ThroughputControllerService server) {
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
index 1c3952e..d37d7c3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
@@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
/**
* A throughput controller which uses the follow schema to limit throughput
@@ -74,7 +73,7 @@ public class PressureAwareCompactionThroughputController
extends PressureAwareTh
private long maxThroughputOffpeak;
@Override
- public void setup(final RegionServerServices server) {
+ public void setup(final ThroughputControllerService server) {
server.getChoreService().scheduleChore(
new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
index 51e7b42..6183786 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
/**
* A throughput controller which uses the follow schema to limit throughput
@@ -68,7 +68,7 @@ public class PressureAwareFlushThroughputController extends
PressureAwareThrough
10L * 1024 * 1024;// 10MB
@Override
- public void setup(final RegionServerServices server) {
+ public void setup(final ThroughputControllerService server) {
server.getChoreService().scheduleChore(
new ScheduledChore("FlushThroughputTuner", this, tuningPeriod,
this.tuningPeriod) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
index 306df0b..a690433 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
@@ -23,12 +23,12 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public abstract class PressureAwareThroughputController extends Configured
implements
@@ -77,10 +77,11 @@ public abstract class PressureAwareThroughputController
extends Configured imple
private volatile double maxThroughput;
private volatile double maxThroughputPerOperation;
- protected final ConcurrentMap<String, ActiveOperation> activeOperations =
new ConcurrentHashMap<>();
+ protected final ConcurrentMap<String, ActiveOperation> activeOperations =
+ new ConcurrentHashMap<>();
@Override
- public abstract void setup(final RegionServerServices server);
+ public abstract void setup(final ThroughputControllerService server);
protected String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
index 707d02d..0e8f3c4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A utility that constrains the total throughput of one or more simultaneous
flows by
@@ -30,9 +29,9 @@ import
org.apache.hadoop.hbase.regionserver.RegionServerServices;
public interface ThroughputController extends Stoppable {
/**
- * Setup controller for the given region server.
+ * Setup controller for the given server.
*/
- void setup(RegionServerServices server);
+ void setup(ThroughputControllerService server);
/**
* Start the throughput controller.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
similarity index 51%
copy from
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
copy to
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
index 707d02d..7e55c6d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControllerService.java
@@ -17,36 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver.throttle;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
- * A utility that constrains the total throughput of one or more simultaneous
flows by
- * sleeping when necessary.
+ * this Interface is provider of methods the {@link ThroughputController}
needs to run
*/
[email protected](HBaseInterfaceAudience.CONFIG)
-public interface ThroughputController extends Stoppable {
-
- /**
- * Setup controller for the given region server.
- */
- void setup(RegionServerServices server);
-
- /**
- * Start the throughput controller.
- */
- void start(String name);
-
- /**
- * Control the throughput. Will sleep if too fast.
- * @return the actual sleep time.
- */
- long control(String name, long size) throws InterruptedException;
-
- /**
- * Finish the controller. Should call this method in a finally block.
- */
- void finish(String name);
[email protected]
+public interface ThroughputControllerService {
+ ChoreService getChoreService();
+ double getCompactionPressure();
+ double getFlushPressure();
+ Configuration getConfiguration();
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
new file mode 100644
index 0000000..6836d91
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionFilesCache.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(SmallTests.class)
+public class TestCompactionFilesCache {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestCompactionFilesCache.class);
+ private static TableName tableName =
TableName.valueOf("testCompactionFilesCache");
+ private static byte[] family = Bytes.toBytes("A");
+ private static CompactionFilesCache storage;
+ private static RegionInfo regionInfo =
RegionInfoBuilder.newBuilder(tableName).build();
+ private static ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(family).build();
+
+ private static class TestStorageThread extends Thread {
+ List<String> fileNameList;
+ boolean flag;
+
+ TestStorageThread(List<String> files, boolean flag) {
+ this.fileNameList = files;
+ this.flag = flag;
+ }
+
+ @Override
+ public void run() {
+ if (flag) {
+ storage.addSelectedFiles(regionInfo, cfd, fileNameList);
+ } else {
+ storage.removeSelectedFiles(regionInfo, cfd, fileNameList);
+ }
+ }
+ }
+
+ @Before
+ public void cleanUpStorage() {
+ storage = new CompactionFilesCache();
+ }
+
+ @Test
+ public void testSelectedFilesStore() {
+ List<String> fileNames = Lists.newArrayList("1");
+ storage.addSelectedFiles(regionInfo, cfd, fileNames);
+ // can not add a selected file again
+ Assert.assertFalse(storage.addSelectedFiles(regionInfo, cfd, fileNames));
+
+ List<String> fileNames2 = Lists.newArrayList("2");
+ Assert.assertTrue(storage.addSelectedFiles(regionInfo, cfd, fileNames2));
+
+ List<String> fileNames3 = Lists.newArrayList("3", "2");
+ Assert.assertFalse(storage.addSelectedFiles(regionInfo, cfd, fileNames3));
+
+ storage.removeSelectedFiles(regionInfo, cfd, fileNames2);
+ Assert.assertTrue(storage.addSelectedFiles(regionInfo, cfd, fileNames3));
+
+ Set<String> selectedFiles = storage.getSelectedStoreFiles(regionInfo, cfd);
+ Assert.assertEquals(3, selectedFiles.size());
+ }
+
+ @Test
+ public void testCompactedFilesStore() {
+ List<String> fileNames = Lists.newArrayList("1");
+ storage.addCompactedFiles(regionInfo, cfd, fileNames);
+ // add a compacted file twice, no check(This should not happen)
+ Assert.assertTrue(storage.addCompactedFiles(regionInfo, cfd, fileNames));
+
+ List<String> fileNames2 = Lists.newArrayList("3", "2");
+ Assert.assertTrue(storage.addCompactedFiles(regionInfo, cfd, fileNames2));
+
+ Set<String> compactedFiles = storage.getCompactedStoreFiles(regionInfo,
cfd);
+ Assert.assertEquals(3, compactedFiles.size());
+
+ Set<String> storeFileNames = Sets.newHashSet("3");
+ storage.cleanupCompactedFiles(regionInfo, cfd, storeFileNames);
+ compactedFiles = storage.getCompactedStoreFiles(regionInfo, cfd);
+ Assert.assertEquals(1, compactedFiles.size());
+ }
+
+ @Test
+ public void testSelectedFilesStoreMultiThread() {
+ int threadNum = 50;
+ TestStorageThread[] testStorageThreads = new TestStorageThread[threadNum];
+ for (int i = 0; i < threadNum; i++) {
+ testStorageThreads[i] =
+ new TestStorageThread(new ArrayList<>(Arrays.asList("" + i)), true);
+ }
+ for (int i = 0; i < threadNum; i++) {
+ testStorageThreads[i].start();
+ }
+ for (int i = 0; i < threadNum; i++) {
+ try {
+ testStorageThreads[i].join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupted();
+ }
+ }
+ Assert.assertEquals(threadNum, storage.getSelectedStoreFiles(regionInfo,
cfd).size());
+ for (int i = 0; i < threadNum; i++) {
+ testStorageThreads[i] =
+ new TestStorageThread(new ArrayList<>(Arrays.asList("" + i)), false);
+ }
+ for (int i = 0; i < threadNum; i++) {
+ testStorageThreads[i].start();
+ }
+ for (int i = 0; i < threadNum; i++) {
+ try {
+ testStorageThreads[i].join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupted();
+ }
+ }
+ Assert.assertEquals(0, storage.getSelectedStoreFiles(regionInfo,
cfd).size());
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index b5da939..1362640 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,22 +17,38 @@
*/
package org.apache.hadoop.hbase.compactionserver;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -43,6 +59,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
@Category({CompactionServerTests.class, MediumTests.class})
public class TestCompactionServer {
@@ -87,18 +104,147 @@ public class TestCompactionServer {
TEST_UTIL.deleteTableIfAny(TABLENAME);
}
+ private void doPutRecord(int start, int end, boolean flush) throws Exception
{
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
+ h.put(p);
+ if (i % 100 == 0 && flush) {
+ TEST_UTIL.flush(TABLENAME);
+ }
+ }
+ h.close();
+ }
+
+ private void doFillRecord(int start, int end, byte[] value) throws Exception
{
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
+ h.put(p);
+ }
+ h.close();
+ }
+
+ private void verifyRecord(int start, int end, boolean exist) throws
Exception {
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Get get = new Get(Bytes.toBytes(i));
+ Result r = h.get(get);
+ if (exist) {
+ assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY),
Bytes.toBytes(COL)));
+ } else {
+ assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+ }
+ }
+ h.close();
+ }
+
+ @Test
+ public void testCompaction() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ doPutRecord(1, 1000, true);
+ int hFileCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ hFileCount +=
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+ }
+ assertEquals(10, hFileCount);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ Thread.sleep(5000);
+ TEST_UTIL.waitFor(60000,
+ () -> COMPACTION_SERVER.requestCount.sum() > 0 &&
COMPACTION_SERVER.compactionThreadManager
+ .getRunningCompactionTasks().values().size() == 0);
+ hFileCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ hFileCount +=
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+ }
+ assertEquals(1, hFileCount);
+ verifyRecord(1, 1000, true);
+ }
+
+ @Test
+ public void testCompactionWithVersions() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ ColumnFamilyDescriptor cfd =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build();
+ TableDescriptor modifiedtableDescriptor =
+
TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ doFillRecord(1, 500, RandomUtils.nextBytes(20));
+ doFillRecord(1, 500, RandomUtils.nextBytes(20));
+ doFillRecord(1, 500, RandomUtils.nextBytes(20));
+ TEST_UTIL.flush(TABLENAME);
+ doPutRecord(1, 500, true);
+
+ int kVCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ for (HStoreFile hStoreFile :
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+ kVCount +=
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+ }
+ }
+ assertEquals(2000, kVCount);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+
+ TEST_UTIL.waitFor(60000, () -> {
+ int hFileCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME))
{
+ hFileCount +=
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+
+ }
+ return hFileCount == 1;
+ });
+
+ kVCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ for (HStoreFile hStoreFile :
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+ kVCount +=
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+ }
+ }
+ assertEquals(1500, kVCount);
+ verifyRecord(1, 500, true);
+ }
+
+
+ @Test
+ public void testCompactionServerDown() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ COMPACTION_SERVER.stop("test");
+ TEST_UTIL.waitFor(60000,
+ () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size()
== 0);
+ doPutRecord(1, 1000, true);
+ int hFileCount = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+ hFileCount +=
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+ }
+ assertEquals(10, hFileCount);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ Thread.sleep(5000);
+ TEST_UTIL.waitFor(60000, () -> {
+ int hFile = 0;
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME))
{
+ hFile += region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+ }
+ return hFile == 1;
+ });
+ verifyRecord(1, 1000, true);
+ }
@Test
public void testCompactionServerReport() throws Exception {
CompactionOffloadManager compactionOffloadManager =
MASTER.getCompactionOffloadManager();
TEST_UTIL.waitFor(60000, () ->
!compactionOffloadManager.getOnlineServers().isEmpty()
- && null !=
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
+ && null !=
compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME));
// invoke compact
TEST_UTIL.compact(TABLENAME, false);
TEST_UTIL.waitFor(60000,
- () -> COMPACTION_SERVER.rpcServices.requestCount.sum() > 0
- && COMPACTION_SERVER.rpcServices.requestCount.sum() ==
compactionOffloadManager
-
.getOnlineServers().get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
+ () -> COMPACTION_SERVER.requestCount.sum() > 0
+ && COMPACTION_SERVER.requestCount.sum() ==
compactionOffloadManager.getOnlineServers()
+ .get(COMPACTION_SERVER_NAME).getTotalNumberOfRequests());
}
@Test