HBASE-19528 - Major Compaction Tool

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4b3b627a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4b3b627a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4b3b627a

Branch: refs/heads/HBASE-19064
Commit: 4b3b627abe34f8426fddd914e941d12f79da0752
Parents: 7c318ce
Author: Rahul Gidwani <chu...@apache.org>
Authored: Thu Jan 25 12:47:50 2018 -0800
Committer: Rahul Gidwani <chu...@apache.org>
Committed: Wed Jan 31 10:18:03 2018 -0800

----------------------------------------------------------------------
 .../compaction/ClusterCompactionQueues.java     | 137 +++++++
 .../util/compaction/MajorCompactionRequest.java | 171 +++++++++
 .../hbase/util/compaction/MajorCompactor.java   | 379 +++++++++++++++++++
 .../compaction/MajorCompactionRequestTest.java  | 166 ++++++++
 .../util/compaction/MajorCompactorTest.java     |  81 ++++
 5 files changed, 934 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
new file mode 100644
index 0000000..c0d34d9
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.Private
+class ClusterCompactionQueues {
+
+  private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues;
+  private final Set<ServerName> compactingServers;
+  private final ReadWriteLock lock;
+  private final int concurrentServers;
+
+  ClusterCompactionQueues(int concurrentServers) {
+    this.concurrentServers = concurrentServers;
+
+    this.compactionQueues = Maps.newHashMap();
+    this.lock = new ReentrantReadWriteLock();
+    this.compactingServers = Sets.newHashSet();
+  }
+
+  void addToCompactionQueue(ServerName serverName, MajorCompactionRequest 
info) {
+    this.lock.writeLock().lock();
+    try {
+      List<MajorCompactionRequest> result = 
this.compactionQueues.get(serverName);
+      if (result == null) {
+        result = Lists.newArrayList();
+        compactionQueues.put(serverName, result);
+      }
+      result.add(info);
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  boolean hasWorkItems() {
+    lock.readLock().lock();
+    try {
+      return !this.compactionQueues.values().stream().allMatch(List::isEmpty);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  int getCompactionRequestsLeftToFinish() {
+    lock.readLock().lock();
+    try {
+      int size = 0;
+      for (List<MajorCompactionRequest> queue : compactionQueues.values()) {
+        size += queue.size();
+      }
+      return size;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @VisibleForTesting List<MajorCompactionRequest> getQueue(ServerName 
serverName) {
+    lock.readLock().lock();
+    try {
+      return compactionQueues.get(serverName);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  MajorCompactionRequest reserveForCompaction(ServerName serverName) {
+    lock.writeLock().lock();
+    try {
+      if (!compactionQueues.get(serverName).isEmpty()) {
+        compactingServers.add(serverName);
+        return compactionQueues.get(serverName).remove(0);
+      }
+      return null;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  void releaseCompaction(ServerName serverName) {
+    lock.writeLock().lock();
+    try {
+      compactingServers.remove(serverName);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  boolean atCapacity() {
+    lock.readLock().lock();
+    try {
+      return compactingServers.size() >= concurrentServers;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  Optional<ServerName> getLargestQueueFromServersNotCompacting() {
+    lock.readLock().lock();
+    try {
+      return compactionQueues.entrySet().stream()
+          .filter(entry -> !compactingServers.contains(entry.getKey()))
+          .max(Map.Entry.comparingByValue(
+            (o1, o2) -> Integer.compare(o1.size(), 
o2.size()))).map(Map.Entry::getKey);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
new file mode 100644
index 0000000..51b2b9d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.Private
+class MajorCompactionRequest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MajorCompactionRequest.class);
+
+  private final Configuration configuration;
+  private final RegionInfo region;
+  private Set<String> stores;
+  private final long timestamp;
+
+  @VisibleForTesting
+  MajorCompactionRequest(Configuration configuration, RegionInfo region,
+      Set<String> stores, long timestamp) {
+    this.configuration = configuration;
+    this.region = region;
+    this.stores = stores;
+    this.timestamp = timestamp;
+  }
+
+  static Optional<MajorCompactionRequest> newRequest(Configuration 
configuration, RegionInfo info,
+      Set<String> stores, long timestamp) throws IOException {
+    MajorCompactionRequest request =
+        new MajorCompactionRequest(configuration, info, stores, timestamp);
+    return request.createRequest(configuration, stores);
+  }
+
+  RegionInfo getRegion() {
+    return region;
+  }
+
+  Set<String> getStores() {
+    return stores;
+  }
+
+  void setStores(Set<String> stores) {
+    this.stores = stores;
+  }
+
+  @VisibleForTesting
+  Optional<MajorCompactionRequest> createRequest(Configuration configuration,
+      Set<String> stores) throws IOException {
+    Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
+    MajorCompactionRequest request = null;
+    if (!familiesToCompact.isEmpty()) {
+      request = new MajorCompactionRequest(configuration, region, 
familiesToCompact, timestamp);
+    }
+    return Optional.ofNullable(request);
+  }
+
+  Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws 
IOException {
+    try(Connection connection = getConnection(configuration)) {
+      HRegionFileSystem fileSystem = getFileSystem(connection);
+      Set<String> familiesToCompact = Sets.newHashSet();
+      for (String family : requestedStores) {
+        // do we have any store files?
+        Collection<StoreFileInfo> storeFiles = 
fileSystem.getStoreFiles(family);
+        if (storeFiles == null) {
+          LOG.info("Excluding store: " + family + " for compaction for region: 
 " + fileSystem
+              .getRegionInfo().getEncodedName(), " has no store files");
+          continue;
+        }
+        // check for reference files
+        if (fileSystem.hasReferences(family) && 
familyHasReferenceFile(fileSystem, family)) {
+          familiesToCompact.add(family);
+          LOG.info("Including store: " + family + " with: " + storeFiles.size()
+              + " files for compaction for region: " + 
fileSystem.getRegionInfo().getEncodedName());
+          continue;
+        }
+        // check store file timestamps
+        boolean includeStore = false;
+        for (StoreFileInfo storeFile : storeFiles) {
+          if (storeFile.getModificationTime() < timestamp) {
+            LOG.info("Including store: " + family + " with: " + 
storeFiles.size()
+                + " files for compaction for region: "
+                + fileSystem.getRegionInfo().getEncodedName());
+            familiesToCompact.add(family);
+            includeStore = true;
+            break;
+          }
+        }
+        if (!includeStore) {
+          LOG.info("Excluding store: " + family + " for compaction for region: 
 " + fileSystem
+              .getRegionInfo().getEncodedName(), " already compacted");
+        }
+      }
+      return familiesToCompact;
+    }
+  }
+
+  @VisibleForTesting
+  Connection getConnection(Configuration configuration) throws IOException {
+    return ConnectionFactory.createConnection(configuration);
+  }
+
+  private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String 
family)
+      throws IOException {
+    List<Path> referenceFiles =
+        getReferenceFilePaths(fileSystem.getFileSystem(), 
fileSystem.getStoreDir(family));
+    for (Path referenceFile : referenceFiles) {
+      FileStatus status = 
fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
+      if (status.getModificationTime() < timestamp) {
+        LOG.info("Including store: " + family + " for compaction for region:  
" + fileSystem
+            .getRegionInfo().getEncodedName() + " (reference store files)");
+        return true;
+      }
+    }
+    return false;
+
+  }
+
+  @VisibleForTesting
+  List<Path> getReferenceFilePaths(FileSystem fileSystem, Path familyDir)
+      throws IOException {
+    return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
+  }
+
+  @VisibleForTesting
+  HRegionFileSystem getFileSystem(Connection connection) throws IOException {
+    Admin admin = connection.getAdmin();
+    return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
+        FSUtils.getCurrentFileSystem(admin.getConfiguration()),
+        FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), 
region.getTable()),
+        region, true);
+  }
+
+  @Override
+  public String toString() {
+    return "region: " + region.getEncodedName() + " store(s): " + stores;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
new file mode 100644
index 0000000..c3372bb
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class MajorCompactor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MajorCompactor.class);
+  private static final Set<MajorCompactionRequest> ERRORS = 
ConcurrentHashMap.newKeySet();
+
+  private final ClusterCompactionQueues clusterCompactionQueues;
+  private final long timestamp;
+  private final Set<String> storesToCompact;
+  private final ExecutorService executor;
+  private final long sleepForMs;
+  private final Connection connection;
+  private final TableName tableName;
+
+  public MajorCompactor(Configuration conf, TableName tableName, Set<String> 
storesToCompact,
+      int concurrency, long timestamp, long sleepForMs) throws IOException {
+    this.connection = ConnectionFactory.createConnection(conf);
+    this.tableName = tableName;
+    this.timestamp = timestamp;
+    this.storesToCompact = storesToCompact;
+    this.executor = Executors.newFixedThreadPool(concurrency);
+    this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
+    this.sleepForMs = sleepForMs;
+  }
+
+  public void compactAllRegions() throws Exception {
+    List<Future<?>> futures = Lists.newArrayList();
+    while (clusterCompactionQueues.hasWorkItems() || 
!futuresComplete(futures)) {
+      while (clusterCompactionQueues.atCapacity()) {
+        LOG.debug("Waiting for servers to complete Compactions");
+        Thread.sleep(sleepForMs);
+      }
+      Optional<ServerName> serverToProcess =
+          clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
+      if (serverToProcess.isPresent() && 
clusterCompactionQueues.hasWorkItems()) {
+        ServerName serverName = serverToProcess.get();
+        // check to see if the region has moved... if so we have to enqueue it 
again with
+        // the proper serverName
+        MajorCompactionRequest request = 
clusterCompactionQueues.reserveForCompaction(serverName);
+
+        ServerName currentServer = connection.getRegionLocator(tableName)
+            
.getRegionLocation(request.getRegion().getStartKey()).getServerName();
+
+        if (!currentServer.equals(serverName)) {
+          // add it back to the queue with the correct server it should be 
picked up in the future.
+          LOG.info("Server changed for region: " + 
request.getRegion().getEncodedName() + " from: "
+              + serverName + " to: " + currentServer + " re-queuing request");
+          clusterCompactionQueues.addToCompactionQueue(currentServer, request);
+          clusterCompactionQueues.releaseCompaction(serverName);
+        } else {
+          LOG.info("Firing off compaction request for server: " + serverName + 
", " + request
+              + " total queue size left: " + clusterCompactionQueues
+              .getCompactionRequestsLeftToFinish());
+          futures.add(executor.submit(new Compact(serverName, request)));
+        }
+      } else {
+        // haven't assigned anything so we sleep.
+        Thread.sleep(sleepForMs);
+      }
+    }
+    LOG.info("All compactions have completed");
+  }
+
+  private boolean futuresComplete(List<Future<?>> futures) {
+    futures.removeIf(Future::isDone);
+    return futures.isEmpty();
+  }
+
+  public void shutdown() throws Exception {
+    executor.shutdown();
+    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    if (!ERRORS.isEmpty()) {
+      StringBuilder builder =
+          new StringBuilder().append("Major compaction failed, there were: 
").append(ERRORS.size())
+              .append(" regions / stores that failed compacting\n")
+              .append("Failed compaction 
requests\n").append("--------------------------\n")
+              .append(Joiner.on("\n").join(ERRORS));
+      LOG.error(builder.toString());
+    }
+    if (connection != null) {
+      connection.close();
+    }
+    LOG.info("All regions major compacted successfully");
+  }
+
+  @VisibleForTesting void initializeWorkQueues() throws IOException {
+    if (storesToCompact.isEmpty()) {
+      connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
+          .forEach(a -> storesToCompact.add(Bytes.toString(a)));
+      LOG.info("No family specified, will execute for all families");
+    }
+    LOG.info(
+        "Initializing compaction queues for table:  " + tableName + " with cf: 
" + storesToCompact);
+    List<HRegionLocation> regionLocations =
+        connection.getRegionLocator(tableName).getAllRegionLocations();
+    for (HRegionLocation location : regionLocations) {
+      Optional<MajorCompactionRequest> request = MajorCompactionRequest
+          .newRequest(connection.getConfiguration(), location.getRegion(), 
storesToCompact,
+              timestamp);
+      request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
+          .addToCompactionQueue(location.getServerName(), 
majorCompactionRequest));
+    }
+  }
+
+  class Compact implements Runnable {
+
+    private final ServerName serverName;
+    private final MajorCompactionRequest request;
+
+    Compact(ServerName serverName, MajorCompactionRequest request) {
+      this.serverName = serverName;
+      this.request = request;
+    }
+
+    @Override public void run() {
+      try {
+        compactAndWait(request);
+      } catch (NotServingRegionException e) {
+        // this region has split or merged
+        LOG.warn("Region is invalid, requesting updated regions", e);
+        // lets updated the cluster compaction queues with these newly created 
regions.
+        addNewRegions();
+      } catch (Exception e) {
+        LOG.warn("Error compacting:", e);
+      } finally {
+        clusterCompactionQueues.releaseCompaction(serverName);
+      }
+    }
+
+    void compactAndWait(MajorCompactionRequest request) throws Exception {
+      Admin admin = connection.getAdmin();
+      try {
+        // only make the request if the region is not already major compacting
+        if (!isCompacting(request)) {
+          Set<String> stores = 
request.getStoresRequiringCompaction(storesToCompact);
+          if (!stores.isEmpty()) {
+            request.setStores(stores);
+            for (String store : request.getStores()) {
+              
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
+                  Bytes.toBytes(store));
+            }
+          }
+        }
+        while (isCompacting(request)) {
+          Thread.sleep(sleepForMs);
+          LOG.debug("Waiting for compaction to complete for region: " + 
request.getRegion()
+              .getEncodedName());
+        }
+      } finally {
+        // Make sure to wait for the CompactedFileDischarger chore to do its 
work
+        int waitForArchive = connection.getConfiguration()
+            .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 
1000);
+        Thread.sleep(waitForArchive);
+        // check if compaction completed successfully, otherwise put that 
request back in the
+        // proper queue
+        Set<String> storesRequiringCompaction =
+            request.getStoresRequiringCompaction(storesToCompact);
+        if (!storesRequiringCompaction.isEmpty()) {
+          // this happens, when a region server is marked as dead, flushes a 
store file and
+          // the new regionserver doesn't pick it up because its accounted for 
in the WAL replay,
+          // thus you have more store files on the filesystem than the 
regionserver knows about.
+          boolean regionHasNotMoved = connection.getRegionLocator(tableName)
+              
.getRegionLocation(request.getRegion().getStartKey()).getServerName()
+              .equals(serverName);
+          if (regionHasNotMoved) {
+            LOG.error("Not all store files were compacted, this may be due to 
the regionserver not "
+                + "being aware of all store files.  Will not reattempt 
compacting, " + request);
+            ERRORS.add(request);
+          } else {
+            request.setStores(storesRequiringCompaction);
+            clusterCompactionQueues.addToCompactionQueue(serverName, request);
+            LOG.info("Compaction failed for the following stores: " + 
storesRequiringCompaction
+                + " region: " + request.getRegion().getEncodedName());
+          }
+        } else {
+          LOG.info("Compaction complete for region: " + 
request.getRegion().getEncodedName()
+              + " -> cf(s): " + request.getStores());
+        }
+      }
+    }
+  }
+
+  private boolean isCompacting(MajorCompactionRequest request) throws 
Exception {
+    CompactionState compactionState = connection.getAdmin()
+        
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
+    return compactionState.equals(CompactionState.MAJOR) || compactionState
+        .equals(CompactionState.MAJOR_AND_MINOR);
+  }
+
+  private void addNewRegions() {
+    try {
+      List<HRegionLocation> locations =
+          connection.getRegionLocator(tableName).getAllRegionLocations();
+      for (HRegionLocation location : locations) {
+        if (location.getRegion().getRegionId() > timestamp) {
+          Optional<MajorCompactionRequest> compactionRequest = 
MajorCompactionRequest
+              .newRequest(connection.getConfiguration(), location.getRegion(), 
storesToCompact,
+                  timestamp);
+          compactionRequest.ifPresent(request -> clusterCompactionQueues
+              .addToCompactionQueue(location.getServerName(), request));
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption(
+        Option.builder("table")
+            .required()
+            .desc("table name")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("cf")
+            .optionalArg(true)
+            .desc("column families: comma separated eg: a,b,c")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("servers")
+            .required()
+            .desc("Concurrent servers compacting")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("minModTime").
+            desc("Compact if store files have modification time < minModTime")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("zk")
+            .optionalArg(true)
+            .desc("zk quorum")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("rootDir")
+            .optionalArg(true)
+            .desc("hbase.rootDir")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("sleep")
+            .desc("Time to sleepForMs (ms) for checking compaction status per 
region and available "
+                + "work queues: default 30s")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("retries")
+        .desc("Max # of retries for a compaction request," + " defaults to 3")
+            .hasArg()
+            .build()
+    );
+    options.addOption(
+        Option.builder("dryRun")
+            .desc("Dry run, will just output a list of regions that require 
compaction based on "
+            + "parameters passed")
+            .hasArg(false)
+            .build()
+    );
+
+    final CommandLineParser cmdLineParser = new DefaultParser();
+    CommandLine commandLine = null;
+    try {
+      commandLine = cmdLineParser.parse(options, args);
+    } catch (ParseException parseException) {
+      System.out.println(
+          "ERROR: Unable to parse command-line arguments " + 
Arrays.toString(args) + " due to: "
+              + parseException);
+      printUsage(options);
+
+    }
+    String tableName = commandLine.getOptionValue("table");
+    String cf = commandLine.getOptionValue("cf", null);
+    Set<String> families = Sets.newHashSet();
+    if (cf != null) {
+      Iterables.addAll(families, Splitter.on(",").split(cf));
+    }
+
+
+    Configuration configuration = HBaseConfiguration.create();
+    int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
+    long minModTime = Long.parseLong(
+        commandLine.getOptionValue("minModTime", 
String.valueOf(System.currentTimeMillis())));
+    String quorum =
+        commandLine.getOptionValue("zk", 
configuration.get(HConstants.ZOOKEEPER_QUORUM));
+    String rootDir = commandLine.getOptionValue("rootDir", 
configuration.get(HConstants.HBASE_DIR));
+    long sleep = Long.valueOf(commandLine.getOptionValue("sleep", 
Long.toString(30000)));
+
+    configuration.set(HConstants.HBASE_DIR, rootDir);
+    configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
+
+    MajorCompactor compactor =
+        new MajorCompactor(configuration, TableName.valueOf(tableName), 
families, concurrency,
+            minModTime, sleep);
+
+    compactor.initializeWorkQueues();
+    if (!commandLine.hasOption("dryRun")) {
+      compactor.compactAllRegions();
+    }
+    compactor.shutdown();
+  }
+
+  private static void printUsage(final Options options) {
+    String header = "\nUsage instructions\n\n";
+    String footer = "\n";
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, 
footer, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
new file mode 100644
index 0000000..c5ce4e3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@Category({SmallTests.class})
+public class MajorCompactionRequestTest {
+
+  private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
+  private static final String FAMILY = "a";
+  private Path rootRegionDir;
+  private Path regionStoreDir;
+
+  @Before public void setUp() throws Exception {
+    rootRegionDir = 
UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest");
+    regionStoreDir = new Path(rootRegionDir, FAMILY);
+  }
+
+  @Test public void testStoresNeedingCompaction() throws Exception {
+    // store files older than timestamp
+    List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
+    MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
+    Optional<MajorCompactionRequest> result =
+        request.createRequest(mock(Configuration.class), 
Sets.newHashSet(FAMILY));
+    assertTrue(result.isPresent());
+
+    // store files newer than timestamp
+    storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
+    request = makeMockRequest(100, storeFiles, false);
+    result = request.createRequest(mock(Configuration.class), 
Sets.newHashSet(FAMILY));
+    assertFalse(result.isPresent());
+  }
+
+  @Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws 
Exception {
+    // this tests that reference files that are new, but have older timestamps 
for the files
+    // they reference still will get compacted.
+    TableName table = TableName.valueOf("MajorCompactorTest");
+    TableDescriptor htd = UTILITY.createTableDescriptor(table, 
Bytes.toBytes(FAMILY));
+    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+    HRegion region =
+        HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, 
UTILITY.getConfiguration(), htd);
+
+    Configuration configuration = mock(Configuration.class);
+    // the reference file timestamp is newer
+    List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
+    List<Path> paths = 
storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
+    // the files that are referenced are older, thus we still compact.
+    HRegionFileSystem fileSystem =
+        mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
+    MajorCompactionRequest majorCompactionRequest = spy(new 
MajorCompactionRequest(configuration,
+        region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
+    
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
+    
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
+        any(Path.class));
+    
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
+    Set<String> result = 
majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
+    assertEquals(FAMILY, Iterables.getOnlyElement(result));
+  }
+
+  private HRegionFileSystem mockFileSystem(RegionInfo info, boolean 
hasReferenceFiles,
+      List<StoreFileInfo> storeFiles) throws IOException {
+    long timestamp = 
storeFiles.stream().findFirst().get().getModificationTime();
+    return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
+  }
+
+  private HRegionFileSystem mockFileSystem(RegionInfo info, boolean 
hasReferenceFiles,
+      List<StoreFileInfo> storeFiles, long referenceFileTimestamp) throws 
IOException {
+    FileSystem fileSystem = mock(FileSystem.class);
+    if (hasReferenceFiles) {
+      FileStatus fileStatus = mock(FileStatus.class);
+      doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime();
+      doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class));
+    }
+    HRegionFileSystem mockSystem = mock(HRegionFileSystem.class);
+    doReturn(info).when(mockSystem).getRegionInfo();
+    doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY);
+    doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString());
+    doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString());
+    doReturn(fileSystem).when(mockSystem).getFileSystem();
+    return mockSystem;
+  }
+
+  private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, 
long timestamp)
+      throws IOException {
+    List<StoreFileInfo> infos = Lists.newArrayList();
+    int i = 0;
+    while (i < howMany) {
+      StoreFileInfo storeFileInfo = mock(StoreFileInfo.class);
+      
doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime();
+      doReturn(new Path(regionStoreDir, 
RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo)
+          .getPath();
+      infos.add(storeFileInfo);
+      i++;
+    }
+    return infos;
+  }
+
+  private MajorCompactionRequest makeMockRequest(long timestamp, 
List<StoreFileInfo> storeFiles,
+      boolean references) throws IOException {
+    Configuration configuration = mock(Configuration.class);
+    RegionInfo regionInfo = mock(RegionInfo.class);
+    when(regionInfo.getEncodedName()).thenReturn("HBase");
+    when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
+    MajorCompactionRequest request =
+        new MajorCompactionRequest(configuration, regionInfo, 
Sets.newHashSet("a"), timestamp);
+    MajorCompactionRequest spy = spy(request);
+    HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, 
storeFiles);
+    doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
+    
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+    return spy;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3b627a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
new file mode 100644
index 0000000..3fb37ec
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.compaction;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category({ MiscTests.class, MediumTests.class })
+public class MajorCompactorTest {
+
+  public static final byte[] FAMILY = Bytes.toBytes("a");
+  private HBaseTestingUtility utility;
+
+  @Before public void setUp() throws Exception {
+    utility = new HBaseTestingUtility();
+    
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 
10);
+    utility.startMiniCluster();
+  }
+
+  @After public void tearDown() throws Exception {
+    utility.shutdownMiniCluster();
+  }
+
+  @Test public void testCompactingATable() throws Exception {
+    TableName tableName = TableName.valueOf("MajorCompactorTest");
+    utility.createMultiRegionTable(tableName, FAMILY, 5);
+    utility.waitTableAvailable(tableName);
+    Connection connection = utility.getConnection();
+    Table table = connection.getTable(tableName);
+    // write data and flush multiple store files:
+    for (int i = 0; i < 5; i++) {
+      utility.loadRandomRows(table, FAMILY, 50, 100);
+      utility.flush(tableName);
+    }
+    table.close();
+    int numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+    int numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    // we should have a table with more store files than we would before we 
major compacted.
+    assertTrue(numberOfRegions < numHFiles);
+
+    MajorCompactor compactor =
+        new MajorCompactor(utility.getConfiguration(), tableName,
+            Sets.newHashSet(Bytes.toString(FAMILY)), 1, 
System.currentTimeMillis(), 200);
+    compactor.initializeWorkQueues();
+    compactor.compactAllRegions();
+    compactor.shutdown();
+
+    // verify that the store has been completely major compacted.
+    numberOfRegions = utility.getAdmin().getRegions(tableName).size();
+    numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    assertEquals(numHFiles, numberOfRegions);
+  }
+}
\ No newline at end of file

Reply via email to