Apache9 commented on a change in pull request #3425:
URL: https://github.com/apache/hbase/pull/3425#discussion_r663373257
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionFilesCache compactionFilesCache = new
CompactionFilesCache();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ HStore store;
+ CompactionContext compactionContext;
+ Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+ // the synchronized ensure file in store, selected files in cache,
compacted files in cache,
+ // the three has consistent state, we need this condition to guarantee
correct selection
+ synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+ synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
+ store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ compactionContext = compaction.get();
+ // 2. update compactionFilesCache
+ updateSelectedFilesCacheResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd,
compactionContext, status, logStr);
+ } // end of synchronized selected files
+ } // end of synchronized compacted files
+ if (!updateSelectedFilesCacheResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames =
updateSelectedFilesCacheResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
compactionFilesCache");
+ Set<String> compactingFiles =
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+ Set<String> compactedFiles =
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+ Set<String> excludeFiles = new HashSet<>(compactingFiles);
+ excludeFiles.addAll(compactedFiles);
+ // Convert files names to store files
+ status.setStatus("Convert current compacting and compacted files to store
files");
+ List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store,
excludeFiles);
+ LOG.info(
+ "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting:
{}, compacted: {}",
+ logStr, excludeFiles.size(), excludeStoreFiles.size(),
compactingFiles.size(),
+ compactedFiles.size());
+ status.setStatus("Select store files to compaction, major: " + major);
+ Optional<CompactionContext> compaction = store.selectCompaction(priority,
+ CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+ LOG.info("After select store: {}, if compaction context is present: {}",
logStr,
+ compaction.isPresent());
+ return new Pair<>(store, compaction);
+ }
+
+ /**
+ * Mark files in compaction context as selected in compactionFilesCache
+ * @return True if success, otherwise if files are already in selected
compactionFilesCache
+ */
+ private Pair<Boolean, List<String>>
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd, CompactionContext compactionContext,
MonitoredTask status,
+ String logStr) {
+ LOG.info("Start update compactionFilesCache after select compaction: {}",
logStr);
+ // save selected files to compactionFilesCache
+ List<String> selectedFilesNames = new ArrayList<>();
+ for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+ selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+ }
+ if (compactionFilesCache.addSelectedFiles(regionInfo, cfd,
selectedFilesNames)) {
+ LOG.info("Update compactionFilesCache after select compaction success:
{}", logStr);
+ status.setStatus("Update compactionFilesCache after select compaction
success");
+ return new Pair<>(Boolean.TRUE, selectedFilesNames);
+ } else {
+ //should not happen
+ LOG.info("selected files are already in store and return: {}", logStr);
+ status.abort("Selected files are already in compactionFilesCache and
return");
+ return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+ }
+ }
+
+ /**
+ * Execute compaction in the process of compaction server
+ */
+ private void doCompaction(CompactionTask compactionTask) throws IOException {
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ HStore store = compactionTask.getStore();
+ CompactionContext compactionContext =
compactionTask.getCompactionContext();
+ List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+ MonitoredTask status = compactionTask.getStatus();
+ try {
+ LOG.info("Start compact store: {}, cf: {}, compaction context: {}",
store, cfd,
+ compactionContext);
+ List<Path> newFiles =
+
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
null);
+ LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd,
newFiles);
+ List<String> newFileNames = new ArrayList<>();
+ for (Path newFile : newFiles) {
+ newFileNames.add(newFile.getName());
+ }
+ reportCompactionCompleted(compactionTask, newFileNames, status);
+ } finally {
+ status.setStatus("Remove selected files");
+ LOG.info("Remove selected files: {}", compactionTask);
+ compactionFilesCache.removeSelectedFiles(regionInfo, cfd,
selectedFileNames);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ /**
+ * Report compaction completed to RS
+ * @return True if report to RS success, otherwise false
+ */
+ private boolean reportCompactionCompleted(CompactionTask task, List<String>
newFiles,
+ MonitoredTask status) throws IOException {
+ ServerName rsServerName = task.getRsServerName();
+ RegionInfo regionInfo = task.getRegionInfo();
+ ColumnFamilyDescriptor cfd = task.getCfd();
+ List<String> selectedFileNames = task.getSelectedFileNames();
+ boolean newForceMajor = task.getStore().getForceMajor();
+ Builder builder =
+
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+ // use file name only, dose not include path, because the size of protobuf
is too big
+ for (String selectFile : selectedFileNames) {
+ builder.addSelectedFiles(selectFile);
+ }
+ for (String newFile : newFiles) {
+ builder.addNewFiles(newFile);
+ }
+ CompleteCompactionRequest completeCompactionRequest = builder.build();
+ AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+ try {
+ status
+ .setStatus("Report complete compaction to RS: " + rsServerName + ",
selected file size: "
+ + selectedFileNames.size() + ", new file size: " +
newFiles.size());
+ LOG.info("Report complete compaction: {}, selectedFileSize: {},
newFileSize: {}", task,
+ completeCompactionRequest.getSelectedFilesList().size(),
+ completeCompactionRequest.getNewFilesList().size());
+ CompleteCompactionResponse completeCompactionResponse =
+
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+ if (completeCompactionResponse.getSuccess()) {
+ status.markComplete("Report to RS succeeded and RS accepted");
+ // move selected files to compacted files
+ compactionFilesCache.addCompactedFiles(regionInfo, cfd,
selectedFileNames);
+ LOG.info("Compaction manager request complete compaction success. {}",
task);
+ } else {
+ //TODO: maybe region is move, we need get latest regionserver name and
retry
+ status.abort("Report to RS succeeded but RS denied");
+ LOG.warn("Compaction manager request complete compaction fail. {}",
task);
+ }
+ return true;
+ } catch (IOException e) {
+ //TODO: rpc call broken, add retry
+ status.abort("Report to RS failed");
+ LOG.error("Compaction manager request complete compaction error. {}",
task, e);
+ return false;
}
- return admin;
}
- public void requestCompaction() {
+ private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String>
excludeFileNames) {
+ Collection<HStoreFile> storefiles = store.getStorefiles();
+ List<HStoreFile> storeFiles = new ArrayList<>();
+ for (HStoreFile storefile : storefiles) {
+ String name = storefile.getPath().getName();
+ if (excludeFileNames.contains(name)) {
+ storeFiles.add(storefile);
+ }
+ }
+ return storeFiles;
+ }
+
+ private HStore getStore(final Configuration conf, final FileSystem fs, final
Path rootDir,
+ final TableDescriptor htd, final RegionInfo hri, final String
familyName) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+ CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+ HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ HStore store = new HStore(region,
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+ OptionalLong maxSequenceId = store.getMaxSequenceId();
+ LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+ region.getMVCC().advanceTo(maxSequenceId.orElse(0));
Review comment:
Why we need to do this?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionFilesCache compactionFilesCache = new
CompactionFilesCache();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ HStore store;
+ CompactionContext compactionContext;
+ Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+ // the synchronized ensure file in store, selected files in cache,
compacted files in cache,
+ // the three has consistent state, we need this condition to guarantee
correct selection
+ synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+ synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
+ store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ compactionContext = compaction.get();
+ // 2. update compactionFilesCache
+ updateSelectedFilesCacheResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd,
compactionContext, status, logStr);
+ } // end of synchronized selected files
+ } // end of synchronized compacted files
+ if (!updateSelectedFilesCacheResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames =
updateSelectedFilesCacheResult.getSecond();
+ compactionTask.setHStore(store);
Review comment:
Do we have other place to set these field separatedly? If not, suggest
we introduce a single method to set them at once.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
##########
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
[email protected]
+public class CompactThreadControl {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactThreadControl.class);
+ private volatile ThreadPoolExecutor longCompactions;
+ private volatile ThreadPoolExecutor shortCompactions;
+ private volatile ThroughputController compactionThroughputController;
+ private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+ public CompactThreadControl(ThroughputControllerService server, int
largeThreads,
+ int smallThreads, Comparator<Runnable> cmp,
+ BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+ createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+ // compaction throughput controller
+ this.compactionThroughputController =
+ CompactionThroughputControllerFactory.create(server,
server.getConfiguration());
+ // compaction throughput controller
+ this.rejection = rejection;
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private class Rejection implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+ rejection.accept(runnable, pool);
+ }
+ }
+
+ void createCompactionExecutors(int largeThreads, int smallThreads,
Comparator<Runnable> cmp) {
+ // if we have throttle threads, make sure the user also specified size
+ Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+ final String n = Thread.currentThread().getName();
+
+ StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+ this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d")
+ .setDaemon(true).build());
+ this.longCompactions.setRejectedExecutionHandler(new Rejection());
+ this.longCompactions.prestartAllCoreThreads();
+ this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+ .setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+ }
+
+ @Override
+ public String toString() {
+ return "compactionQueue=(longCompactions=" +
longCompactions.getQueue().size()
+ + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+ }
+
+ public StringBuilder dumpQueue() {
+ StringBuilder queueLists = new StringBuilder();
+ queueLists.append("Compaction/Split Queue dump:\n");
+ queueLists.append(" LargeCompation Queue:\n");
+ BlockingQueue<Runnable> lq = longCompactions.getQueue();
+ Iterator<Runnable> it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+
+ if (shortCompactions != null) {
+ queueLists.append("\n");
+ queueLists.append(" SmallCompation Queue:\n");
+ lq = shortCompactions.getQueue();
+ it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+ }
+ return queueLists;
+ }
+
+ public ThreadPoolExecutor getLongCompactions() {
+ return longCompactions;
+ }
+
+ public ThreadPoolExecutor getShortCompactions() {
+ return shortCompactions;
+ }
+
+ public void
+ setCompactionThroughputController(ThroughputController
compactionThroughputController) {
+ this.compactionThroughputController = compactionThroughputController;
+ }
+
+ public ThroughputController getCompactionThroughputController() {
+ return compactionThroughputController;
+ }
+
+ public void waitForStop() {
+ waitForPoolStop(longCompactions, "Large Compaction Thread");
+ waitForPoolStop(shortCompactions, "Small Compaction Thread");
+ }
+
+ private void waitForPoolStop(ThreadPoolExecutor t, String name) {
+ if (t == null) {
+ return;
+ }
+ try {
+ t.shutdown();
+ t.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted waiting for " + name + " to finish...");
Review comment:
Reset the interrupted state?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
##########
@@ -231,6 +247,17 @@ public void run() {
abort(prefix + t.getMessage(), t);
}
}
+ stopChores();
+ if (this.compactionThreadManager != null) {
+ this.compactionThreadManager.waitForStop();
+ }
+
Review comment:
nits: useless empty line.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactThreadControl.java
##########
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class help manage compaction thread pools and compaction throughput
controller,
+ * @see CompactSplit
+ * @see org.apache.hadoop.hbase.compactionserver.CompactionThreadManager
+ */
[email protected]
+public class CompactThreadControl {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactThreadControl.class);
+ private volatile ThreadPoolExecutor longCompactions;
+ private volatile ThreadPoolExecutor shortCompactions;
+ private volatile ThroughputController compactionThroughputController;
+ private BiConsumer<Runnable, ThreadPoolExecutor> rejection;
+
+ public CompactThreadControl(ThroughputControllerService server, int
largeThreads,
+ int smallThreads, Comparator<Runnable> cmp,
+ BiConsumer<Runnable, ThreadPoolExecutor> rejection) {
+ createCompactionExecutors(largeThreads, smallThreads, cmp);
+
+ // compaction throughput controller
+ this.compactionThroughputController =
+ CompactionThroughputControllerFactory.create(server,
server.getConfiguration());
+ // compaction throughput controller
+ this.rejection = rejection;
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private class Rejection implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+ rejection.accept(runnable, pool);
+ }
+ }
+
+ void createCompactionExecutors(int largeThreads, int smallThreads,
Comparator<Runnable> cmp) {
+ // if we have throttle threads, make sure the user also specified size
+ Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
+
+ final String n = Thread.currentThread().getName();
+
+ StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(cmp);
+ this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n +
"-longCompactions-%d")
+ .setDaemon(true).build());
+ this.longCompactions.setRejectedExecutionHandler(new Rejection());
+ this.longCompactions.prestartAllCoreThreads();
+ this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS,
+ stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
+ .setNameFormat(n +
"-shortCompactions-%d").setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+ }
+
+ @Override
+ public String toString() {
+ return "compactionQueue=(longCompactions=" +
longCompactions.getQueue().size()
+ + ":shortCompactions=" + shortCompactions.getQueue().size() + ")";
+ }
+
+ public StringBuilder dumpQueue() {
+ StringBuilder queueLists = new StringBuilder();
+ queueLists.append("Compaction/Split Queue dump:\n");
+ queueLists.append(" LargeCompation Queue:\n");
+ BlockingQueue<Runnable> lq = longCompactions.getQueue();
+ Iterator<Runnable> it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+
+ if (shortCompactions != null) {
+ queueLists.append("\n");
+ queueLists.append(" SmallCompation Queue:\n");
+ lq = shortCompactions.getQueue();
+ it = lq.iterator();
+ while (it.hasNext()) {
+ queueLists.append(" " + it.next().toString());
+ queueLists.append("\n");
+ }
+ }
+ return queueLists;
+ }
+
+ public ThreadPoolExecutor getLongCompactions() {
+ return longCompactions;
+ }
+
+ public ThreadPoolExecutor getShortCompactions() {
+ return shortCompactions;
+ }
+
+ public void
+ setCompactionThroughputController(ThroughputController
compactionThroughputController) {
Review comment:
This line break is a bit strange? Formatting?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
##########
@@ -3540,6 +3546,13 @@ public ThroughputController
getFlushThroughputController() {
return flushThroughputController;
}
+ /**
+ * @return the flush pressure of all stores on this regionserver. The value
should be greater than
+ * or equal to 0.0, and any value greater than 1.0 means we enter
the emergency state that
+ * global memstore size already exceeds lower limit.
+ * @deprecated Since 2.0.0
Review comment:
Still not sure why this method is deprecated... Is it possible to remove
it?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionFilesCache compactionFilesCache = new
CompactionFilesCache();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ HStore store;
+ CompactionContext compactionContext;
+ Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+ // the synchronized ensure file in store, selected files in cache,
compacted files in cache,
+ // the three has consistent state, we need this condition to guarantee
correct selection
+ synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+ synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
+ store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ compactionContext = compaction.get();
+ // 2. update compactionFilesCache
+ updateSelectedFilesCacheResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd,
compactionContext, status, logStr);
+ } // end of synchronized selected files
+ } // end of synchronized compacted files
+ if (!updateSelectedFilesCacheResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames =
updateSelectedFilesCacheResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
compactionFilesCache");
+ Set<String> compactingFiles =
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+ Set<String> compactedFiles =
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+ Set<String> excludeFiles = new HashSet<>(compactingFiles);
+ excludeFiles.addAll(compactedFiles);
+ // Convert files names to store files
+ status.setStatus("Convert current compacting and compacted files to store
files");
+ List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store,
excludeFiles);
+ LOG.info(
+ "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting:
{}, compacted: {}",
+ logStr, excludeFiles.size(), excludeStoreFiles.size(),
compactingFiles.size(),
+ compactedFiles.size());
+ status.setStatus("Select store files to compaction, major: " + major);
+ Optional<CompactionContext> compaction = store.selectCompaction(priority,
+ CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+ LOG.info("After select store: {}, if compaction context is present: {}",
logStr,
+ compaction.isPresent());
+ return new Pair<>(store, compaction);
+ }
+
+ /**
+ * Mark files in compaction context as selected in compactionFilesCache
+ * @return True if success, otherwise if files are already in selected
compactionFilesCache
+ */
+ private Pair<Boolean, List<String>>
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd, CompactionContext compactionContext,
MonitoredTask status,
+ String logStr) {
+ LOG.info("Start update compactionFilesCache after select compaction: {}",
logStr);
+ // save selected files to compactionFilesCache
+ List<String> selectedFilesNames = new ArrayList<>();
+ for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+ selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+ }
+ if (compactionFilesCache.addSelectedFiles(regionInfo, cfd,
selectedFilesNames)) {
+ LOG.info("Update compactionFilesCache after select compaction success:
{}", logStr);
+ status.setStatus("Update compactionFilesCache after select compaction
success");
+ return new Pair<>(Boolean.TRUE, selectedFilesNames);
+ } else {
+ //should not happen
+ LOG.info("selected files are already in store and return: {}", logStr);
+ status.abort("Selected files are already in compactionFilesCache and
return");
+ return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+ }
+ }
+
+ /**
+ * Execute compaction in the process of compaction server
+ */
+ private void doCompaction(CompactionTask compactionTask) throws IOException {
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ HStore store = compactionTask.getStore();
+ CompactionContext compactionContext =
compactionTask.getCompactionContext();
+ List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+ MonitoredTask status = compactionTask.getStatus();
+ try {
+ LOG.info("Start compact store: {}, cf: {}, compaction context: {}",
store, cfd,
+ compactionContext);
+ List<Path> newFiles =
+
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
null);
+ LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd,
newFiles);
+ List<String> newFileNames = new ArrayList<>();
+ for (Path newFile : newFiles) {
+ newFileNames.add(newFile.getName());
+ }
+ reportCompactionCompleted(compactionTask, newFileNames, status);
+ } finally {
+ status.setStatus("Remove selected files");
+ LOG.info("Remove selected files: {}", compactionTask);
+ compactionFilesCache.removeSelectedFiles(regionInfo, cfd,
selectedFileNames);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ /**
+ * Report compaction completed to RS
+ * @return True if report to RS success, otherwise false
+ */
+ private boolean reportCompactionCompleted(CompactionTask task, List<String>
newFiles,
+ MonitoredTask status) throws IOException {
+ ServerName rsServerName = task.getRsServerName();
+ RegionInfo regionInfo = task.getRegionInfo();
+ ColumnFamilyDescriptor cfd = task.getCfd();
+ List<String> selectedFileNames = task.getSelectedFileNames();
+ boolean newForceMajor = task.getStore().getForceMajor();
+ Builder builder =
+
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+ // use file name only, dose not include path, because the size of protobuf
is too big
+ for (String selectFile : selectedFileNames) {
+ builder.addSelectedFiles(selectFile);
+ }
+ for (String newFile : newFiles) {
+ builder.addNewFiles(newFile);
+ }
+ CompleteCompactionRequest completeCompactionRequest = builder.build();
+ AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+ try {
+ status
+ .setStatus("Report complete compaction to RS: " + rsServerName + ",
selected file size: "
+ + selectedFileNames.size() + ", new file size: " +
newFiles.size());
+ LOG.info("Report complete compaction: {}, selectedFileSize: {},
newFileSize: {}", task,
+ completeCompactionRequest.getSelectedFilesList().size(),
+ completeCompactionRequest.getNewFilesList().size());
+ CompleteCompactionResponse completeCompactionResponse =
+
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+ if (completeCompactionResponse.getSuccess()) {
+ status.markComplete("Report to RS succeeded and RS accepted");
+ // move selected files to compacted files
+ compactionFilesCache.addCompactedFiles(regionInfo, cfd,
selectedFileNames);
+ LOG.info("Compaction manager request complete compaction success. {}",
task);
+ } else {
+ //TODO: maybe region is move, we need get latest regionserver name and
retry
+ status.abort("Report to RS succeeded but RS denied");
+ LOG.warn("Compaction manager request complete compaction fail. {}",
task);
+ }
+ return true;
+ } catch (IOException e) {
+ //TODO: rpc call broken, add retry
+ status.abort("Report to RS failed");
+ LOG.error("Compaction manager request complete compaction error. {}",
task, e);
+ return false;
}
- return admin;
}
- public void requestCompaction() {
+ private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String>
excludeFileNames) {
+ Collection<HStoreFile> storefiles = store.getStorefiles();
+ List<HStoreFile> storeFiles = new ArrayList<>();
+ for (HStoreFile storefile : storefiles) {
+ String name = storefile.getPath().getName();
+ if (excludeFileNames.contains(name)) {
+ storeFiles.add(storefile);
+ }
+ }
+ return storeFiles;
+ }
+
+ private HStore getStore(final Configuration conf, final FileSystem fs, final
Path rootDir,
+ final TableDescriptor htd, final RegionInfo hri, final String
familyName) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+ CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+ HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ HStore store = new HStore(region,
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
Review comment:
OK, we always need a region when constructing a store... Not your fault,
maybe we need to find a way to decouple them...
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
##########
@@ -19,41 +18,446 @@
package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.CompactThreadControl;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest.Builder;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
+
+/**
+ * CompactionThreadManager reuse {@link HStore#selectCompaction}, {@link
HStore#throttleCompaction},
+ * {@link CompactionContext#compact}, {@link CompactThreadControl}, which are
core logic of
+ * compaction.
+ */
@InterfaceAudience.Private
-public class CompactionThreadManager {
+public class CompactionThreadManager implements ThroughputControllerService {
private static Logger LOG =
LoggerFactory.getLogger(CompactionThreadManager.class);
+ // Configuration key for the large compaction threads.
+ private final static String LARGE_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.large";
+ private final static int LARGE_COMPACTION_THREADS_DEFAULT = 10;
+ // Configuration key for the small compaction threads.
+ private final static String SMALL_COMPACTION_THREADS =
+ "hbase.compaction.server.thread.compaction.small";
+ private final static int SMALL_COMPACTION_THREADS_DEFAULT = 50;
private final Configuration conf;
- private final ConcurrentMap<ServerName, AsyncRegionServerAdmin> rsAdmins =
- new ConcurrentHashMap<>();
private final HCompactionServer server;
+ private HFileSystem fs;
+ private Path rootDir;
+ private FSTableDescriptors tableDescriptors;
+ private CompactThreadControl compactThreadControl;
+ private ConcurrentHashMap<String, CompactionTask> runningCompactionTasks =
+ new ConcurrentHashMap<>();
+ private static CompactionFilesCache compactionFilesCache = new
CompactionFilesCache();
- public CompactionThreadManager(final Configuration conf, HCompactionServer
server) {
+ CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
+ try {
+ this.fs = new HFileSystem(this.conf, true);
+ this.rootDir = CommonFSUtils.getRootDir(this.conf);
+ this.tableDescriptors = new FSTableDescriptors(conf);
+ int largeThreads =
+ Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS,
LARGE_COMPACTION_THREADS_DEFAULT));
+ int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS,
SMALL_COMPACTION_THREADS_DEFAULT);
+ compactThreadControl = new CompactThreadControl(this, largeThreads,
smallThreads,
+ COMPACTION_TASK_COMPARATOR, REJECTION);
+ } catch (Throwable t) {
+ LOG.error("Failed construction CompactionThreadManager", t);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return server.getChoreService();
+ }
+
+ @Override
+ public double getCompactionPressure() {
+ double max = 0;
+ for (CompactionTask task : getRunningCompactionTasks().values()) {
+ double normCount = task.getStore().getCompactionPressure();
+ if (normCount > max) {
+ max = normCount;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
+
+ public void requestCompaction(CompactionTask compactionTask) {
+ try {
+ selectFileAndExecuteTask(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Failed requestCompaction {}", compactionTask, e);
+ }
+ }
+
+ private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
+ ServerName rsServerName = compactionTask.getRsServerName();
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ String logStr = compactionTask.toString();
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Compacting region: " +
regionInfo.getRegionNameAsString()
+ + ", family: " + cfd.getNameAsString() + " from RS: " +
rsServerName);
+ status.enableStatusJournal(false);
+ // 1. select compaction and check compaction context is present
+ LOG.info("Start select compaction {}", compactionTask);
+ status.setStatus("Start select compaction");
+ HStore store;
+ CompactionContext compactionContext;
+ Pair<Boolean, List<String>> updateSelectedFilesCacheResult;
+ // the synchronized ensure file in store, selected files in cache,
compacted files in cache,
+ // the three has consistent state, we need this condition to guarantee
correct selection
+ synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
+ synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
+ Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
+ store = pair.getFirst();
+ Optional<CompactionContext> compaction = pair.getSecond();
+ if (!compaction.isPresent()) {
+ store.close();
+ LOG.info("Compaction context is empty: {}", compactionTask);
+ status.abort("Compaction context is empty and return");
+ return;
+ }
+ compactionContext = compaction.get();
+ // 2. update compactionFilesCache
+ updateSelectedFilesCacheResult =
+ updateStorageAfterSelectCompaction(regionInfo, cfd,
compactionContext, status, logStr);
+ } // end of synchronized selected files
+ } // end of synchronized compacted files
+ if (!updateSelectedFilesCacheResult.getFirst()) {
+ store.close();
+ return;
+ }
+ List<String> selectedFileNames =
updateSelectedFilesCacheResult.getSecond();
+ compactionTask.setHStore(store);
+ compactionTask.setCompactionContext(compactionContext);
+ compactionTask.setSelectedFileNames(selectedFileNames);
+ compactionTask.setMonitoredTask(status);
+ compactionTask.setPriority(compactionContext.getRequest().getPriority());
+ // 3. execute a compaction task
+ ThreadPoolExecutor pool;
+ pool = store.throttleCompaction(compactionContext.getRequest().getSize())
+ ? compactThreadControl.getLongCompactions()
+ : compactThreadControl.getShortCompactions();
+ pool.submit(new CompactionTaskRunner(compactionTask));
+ }
+
+ /**
+ * Open store, and select compaction context
+ * @return Store and CompactionContext
+ */
+ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
+ throws IOException {
+ status.setStatus("Open store");
+ tableDescriptors.get(regionInfo.getTable());
+ HStore store = getStore(conf, fs, rootDir,
tableDescriptors.get(regionInfo.getTable()),
+ regionInfo, cfd.getNameAsString());
+
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ // get current compacting and compacted files, NOTE: these files are file
names only, don't
+ // include paths.
+ status.setStatus("Get current compacting and compacted files from
compactionFilesCache");
+ Set<String> compactingFiles =
compactionFilesCache.getSelectedStoreFiles(regionInfo, cfd);
+ Set<String> compactedFiles =
compactionFilesCache.getCompactedStoreFiles(regionInfo, cfd);
+ Set<String> excludeFiles = new HashSet<>(compactingFiles);
+ excludeFiles.addAll(compactedFiles);
+ // Convert files names to store files
+ status.setStatus("Convert current compacting and compacted files to store
files");
+ List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store,
excludeFiles);
+ LOG.info(
+ "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting:
{}, compacted: {}",
+ logStr, excludeFiles.size(), excludeStoreFiles.size(),
compactingFiles.size(),
+ compactedFiles.size());
+ status.setStatus("Select store files to compaction, major: " + major);
+ Optional<CompactionContext> compaction = store.selectCompaction(priority,
+ CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
+ LOG.info("After select store: {}, if compaction context is present: {}",
logStr,
+ compaction.isPresent());
+ return new Pair<>(store, compaction);
+ }
+
+ /**
+ * Mark files in compaction context as selected in compactionFilesCache
+ * @return True if success, otherwise if files are already in selected
compactionFilesCache
+ */
+ private Pair<Boolean, List<String>>
updateStorageAfterSelectCompaction(RegionInfo regionInfo,
+ ColumnFamilyDescriptor cfd, CompactionContext compactionContext,
MonitoredTask status,
+ String logStr) {
+ LOG.info("Start update compactionFilesCache after select compaction: {}",
logStr);
+ // save selected files to compactionFilesCache
+ List<String> selectedFilesNames = new ArrayList<>();
+ for (HStoreFile selectFile : compactionContext.getRequest().getFiles()) {
+ selectedFilesNames.add(selectFile.getFileInfo().getPath().getName());
+ }
+ if (compactionFilesCache.addSelectedFiles(regionInfo, cfd,
selectedFilesNames)) {
+ LOG.info("Update compactionFilesCache after select compaction success:
{}", logStr);
+ status.setStatus("Update compactionFilesCache after select compaction
success");
+ return new Pair<>(Boolean.TRUE, selectedFilesNames);
+ } else {
+ //should not happen
+ LOG.info("selected files are already in store and return: {}", logStr);
+ status.abort("Selected files are already in compactionFilesCache and
return");
+ return new Pair<>(Boolean.FALSE, Collections.EMPTY_LIST);
+ }
+ }
+
+ /**
+ * Execute compaction in the process of compaction server
+ */
+ private void doCompaction(CompactionTask compactionTask) throws IOException {
+ RegionInfo regionInfo = compactionTask.getRegionInfo();
+ ColumnFamilyDescriptor cfd = compactionTask.getCfd();
+ HStore store = compactionTask.getStore();
+ CompactionContext compactionContext =
compactionTask.getCompactionContext();
+ List<String> selectedFileNames = compactionTask.getSelectedFileNames();
+ MonitoredTask status = compactionTask.getStatus();
+ try {
+ LOG.info("Start compact store: {}, cf: {}, compaction context: {}",
store, cfd,
+ compactionContext);
+ List<Path> newFiles =
+
compactionContext.compact(compactThreadControl.getCompactionThroughputController(),
null);
+ LOG.info("Finish compact store: {}, cf: {}, new files: {}", store, cfd,
newFiles);
+ List<String> newFileNames = new ArrayList<>();
+ for (Path newFile : newFiles) {
+ newFileNames.add(newFile.getName());
+ }
+ reportCompactionCompleted(compactionTask, newFileNames, status);
+ } finally {
+ status.setStatus("Remove selected files");
+ LOG.info("Remove selected files: {}", compactionTask);
+ compactionFilesCache.removeSelectedFiles(regionInfo, cfd,
selectedFileNames);
+ }
}
- private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) throws
IOException {
- AsyncRegionServerAdmin admin = this.rsAdmins.get(sn);
- if (admin == null) {
- LOG.debug("New RS admin connection to {}", sn);
- admin = this.server.getAsyncClusterConnection().getRegionServerAdmin(sn);
- this.rsAdmins.put(sn, admin);
+ /**
+ * Report compaction completed to RS
+ * @return True if report to RS success, otherwise false
+ */
+ private boolean reportCompactionCompleted(CompactionTask task, List<String>
newFiles,
+ MonitoredTask status) throws IOException {
+ ServerName rsServerName = task.getRsServerName();
+ RegionInfo regionInfo = task.getRegionInfo();
+ ColumnFamilyDescriptor cfd = task.getCfd();
+ List<String> selectedFileNames = task.getSelectedFileNames();
+ boolean newForceMajor = task.getStore().getForceMajor();
+ Builder builder =
+
CompleteCompactionRequest.newBuilder().setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
+
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setNewForceMajor(newForceMajor);
+ // use file name only, dose not include path, because the size of protobuf
is too big
+ for (String selectFile : selectedFileNames) {
+ builder.addSelectedFiles(selectFile);
+ }
+ for (String newFile : newFiles) {
+ builder.addNewFiles(newFile);
+ }
+ CompleteCompactionRequest completeCompactionRequest = builder.build();
+ AsyncRegionServerAdmin rsAdmin = getRsAdmin(rsServerName);
+ try {
+ status
+ .setStatus("Report complete compaction to RS: " + rsServerName + ",
selected file size: "
+ + selectedFileNames.size() + ", new file size: " +
newFiles.size());
+ LOG.info("Report complete compaction: {}, selectedFileSize: {},
newFileSize: {}", task,
+ completeCompactionRequest.getSelectedFilesList().size(),
+ completeCompactionRequest.getNewFilesList().size());
+ CompleteCompactionResponse completeCompactionResponse =
+
FutureUtils.get(rsAdmin.completeCompaction(completeCompactionRequest));
+ if (completeCompactionResponse.getSuccess()) {
+ status.markComplete("Report to RS succeeded and RS accepted");
+ // move selected files to compacted files
+ compactionFilesCache.addCompactedFiles(regionInfo, cfd,
selectedFileNames);
+ LOG.info("Compaction manager request complete compaction success. {}",
task);
+ } else {
+ //TODO: maybe region is move, we need get latest regionserver name and
retry
+ status.abort("Report to RS succeeded but RS denied");
+ LOG.warn("Compaction manager request complete compaction fail. {}",
task);
+ }
+ return true;
+ } catch (IOException e) {
+ //TODO: rpc call broken, add retry
+ status.abort("Report to RS failed");
+ LOG.error("Compaction manager request complete compaction error. {}",
task, e);
+ return false;
}
- return admin;
}
- public void requestCompaction() {
+ private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String>
excludeFileNames) {
+ Collection<HStoreFile> storefiles = store.getStorefiles();
+ List<HStoreFile> storeFiles = new ArrayList<>();
+ for (HStoreFile storefile : storefiles) {
+ String name = storefile.getPath().getName();
+ if (excludeFileNames.contains(name)) {
+ storeFiles.add(storefile);
+ }
+ }
+ return storeFiles;
+ }
+
+ private HStore getStore(final Configuration conf, final FileSystem fs, final
Path rootDir,
+ final TableDescriptor htd, final RegionInfo hri, final String
familyName) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
+ CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
+ HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ HStore store = new HStore(region,
htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
+ OptionalLong maxSequenceId = store.getMaxSequenceId();
+ LOG.info("store max sequence id: {}", maxSequenceId.orElse(0));
+ region.getMVCC().advanceTo(maxSequenceId.orElse(0));
+ return store;
+ }
+
+ private AsyncRegionServerAdmin getRsAdmin(final ServerName sn) {
+ return server.getAsyncClusterConnection().getRegionServerAdmin(sn);
+ }
+
+ ConcurrentHashMap<String, CompactionTask> getRunningCompactionTasks() {
+ return runningCompactionTasks;
+ }
+
+ void waitForStop() {
+ compactThreadControl.waitForStop();
+ }
+
+ private void executeCompaction(CompactionTask compactionTask) {
+ try {
+ String taskName = compactionTask.getRsServerName() + "-"
+ + compactionTask.getRegionInfo().getRegionNameAsString() + "-"
+ + compactionTask.getCfd().getNameAsString() + "-" +
System.currentTimeMillis();
+ compactionTask.setTaskName(taskName);
+ runningCompactionTasks.put(compactionTask.getTaskName(), compactionTask);
+ doCompaction(compactionTask);
+ } catch (Throwable e) {
+ LOG.error("Execute compaction task error: {}", compactionTask, e);
Review comment:
What happens if we reach here? Other than this error log?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]