HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction (Clara Xiong)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f60fc9d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f60fc9d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f60fc9d1

Branch: refs/heads/HBASE-14850
Commit: f60fc9d1a0625970aa2fd14d29e4c1266f9571b7
Parents: d393603
Author: tedyu <yuzhih...@gmail.com>
Authored: Thu Apr 7 14:58:59 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Thu Apr 7 14:58:59 2016 -0700

----------------------------------------------------------------------
 .../regionserver/DateTieredStoreEngine.java     | 102 ++++++
 .../hadoop/hbase/regionserver/HStore.java       |   2 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |  34 +-
 .../compactions/CompactionConfiguration.java    |  10 +
 .../compactions/CompactionPolicy.java           |   2 +-
 .../compactions/CompactionRequest.java          |  16 +-
 .../compactions/DateTieredCompactionPolicy.java | 358 +++++++++++++------
 .../DateTieredCompactionRequest.java            |  44 +++
 .../compactions/ExploringCompactionPolicy.java  |   4 +-
 .../compactions/FIFOCompactionPolicy.java       |   5 +-
 .../compactions/RatioBasedCompactionPolicy.java | 318 ++++------------
 .../compactions/SortedCompactionPolicy.java     | 239 +++++++++++++
 .../compactions/StripeCompactionPolicy.java     |   3 +-
 .../hbase/regionserver/MockStoreFile.java       |  40 ++-
 .../TestDateTieredCompactionPolicy.java         | 325 +++++++++++++++++
 .../compactions/EverythingPolicy.java           |   2 +-
 16 files changed, 1102 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
new file mode 100644
index 0000000..773baab
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
+import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * HBASE-15400 This store engine allows us to store data in date tiered layout 
with exponential
+ * sizing so that the more recent data has more granularity. Time-range scan 
will perform the
+ * best with most recent data. When data reach maxAge, they are compacted in 
fixed-size time
+ * windows for TTL and archiving. Please refer to design spec for more details.
+ * 
https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/edit#heading=h.uk6y5pd3oqgx
+ */
+@InterfaceAudience.Private
+public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
+  DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
+  @Override
+  public boolean needsCompaction(List<StoreFile> filesCompacting) {
+    return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(),
+      filesCompacting);
+  }
+
+  @Override
+  public CompactionContext createCompaction() throws IOException {
+    return new DateTieredCompactionContext();
+  }
+
+  @Override
+  protected void createComponents(Configuration conf, Store store, 
CellComparator kvComparator)
+      throws IOException {
+    this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
+    this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf,
+        compactionPolicy.getConf());
+    this.storeFlusher = new DefaultStoreFlusher(conf, store);
+    this.compactor = new DateTieredCompactor(conf, store);
+  }
+
+  private final class DateTieredCompactionContext extends CompactionContext {
+
+    @Override
+    public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+      return 
compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
+        filesCompacting);
+    }
+
+    @Override
+    public boolean select(List<StoreFile> filesCompacting, boolean 
isUserCompaction,
+        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+      request = 
compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), 
filesCompacting,
+        isUserCompaction, mayUseOffPeak, forceMajor);
+      return request != null;
+    }
+
+    @Override
+    public void forceSelect(CompactionRequest request) {
+      if (!(request instanceof DateTieredCompactionRequest)) {
+        throw new IllegalArgumentException("DateTieredCompactionRequest is 
expected. Actual: "
+            + request.getClass().getCanonicalName());
+      }
+      super.forceSelect(request);
+    }
+
+    public List<Path> compact(ThroughputController throughputController, User 
user)
+        throws IOException {
+      if (request instanceof DateTieredCompactionRequest) {
+        return compactor.compact(request, ((DateTieredCompactionRequest) 
request).getBoundaries(),
+          throughputController, user);
+      } else {
+        throw new IllegalArgumentException("DateTieredCompactionRequest is 
expected. Actual: "
+          + request.getClass().getCanonicalName());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7321028..9524c5b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1521,7 +1521,7 @@ public class HStore implements Store {
         return false;
       }
     }
-    return storeEngine.getCompactionPolicy().isMajorCompaction(
+    return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
         this.storeEngine.getStoreFileManager().getStorefiles());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 868bee0..21ae417 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
 import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -62,12 +68,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.WritableUtils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-
 /**
  * A Store data file.  Stores usually have one or more of these files.  They
  * are produced by flushing the memstore to disk.  To
@@ -384,7 +384,7 @@ public class StoreFile {
    * is turned off, fall back to BULKLOAD_TIME_KEY.
    * @return true if this storefile was created by bulk load.
    */
-  boolean isBulkLoadResult() {
+  public boolean isBulkLoadResult() {
     boolean bulkLoadedHFile = false;
     String fileName = this.getPath().getName();
     int startPos = fileName.indexOf("SeqId_");
@@ -1777,6 +1777,19 @@ public class StoreFile {
           Ordering.natural().onResultOf(new GetPathName())
       ));
 
+    /**
+     * Comparator for time-aware compaction. SeqId is still the first
+     *   ordering criterion to maintain MVCC.
+     */
+    public static final Comparator<StoreFile> SEQ_ID_MAX_TIMESTAMP =
+      Ordering.compound(ImmutableList.of(
+        Ordering.natural().onResultOf(new GetSeqId()),
+        Ordering.natural().onResultOf(new GetMaxTimestamp()),
+        Ordering.natural().onResultOf(new GetFileSize()).reverse(),
+        Ordering.natural().onResultOf(new GetBulkTime()),
+        Ordering.natural().onResultOf(new GetPathName())
+      ));
+
     private static class GetSeqId implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
@@ -1811,5 +1824,12 @@ public class StoreFile {
         return sf.getPath().getName();
       }
     }
+
+    private static class GetMaxTimestamp implements Function<StoreFile, Long> {
+      @Override
+      public Long apply(StoreFile sf) {
+        return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : 
sf.getMaximumTimestamp();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 9bb4c77..97cc404 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -80,6 +80,8 @@ public class CompactionConfiguration {
     "hbase.hstore.compaction.date.tiered.incoming.window.min";
   public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY =
     "hbase.hstore.compaction.date.tiered.window.policy.class";
+  public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY =
+    "hbase.hstore.compaction.date.tiered.single.output.for.minor.compaction";
 
   private static final Class<? extends RatioBasedCompactionPolicy>
     DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
@@ -105,6 +107,7 @@ public class CompactionConfiguration {
   private final int windowsPerTier;
   private final int incomingWindowMin;
   private final String compactionPolicyForTieredWindow;
+  private final boolean singleOutputForMinorCompaction;
 
   CompactionConfiguration(Configuration conf, StoreConfigInformation 
storeConfigInfo) {
     this.conf = conf;
@@ -134,6 +137,9 @@ public class CompactionConfiguration {
     incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6);
     compactionPolicyForTieredWindow = 
conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY,
         DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName());
+    singleOutputForMinorCompaction = 
conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
+      true);
+
     LOG.info(this);
   }
 
@@ -274,4 +280,8 @@ public class CompactionConfiguration {
   public String getCompactionPolicyForTieredWindow() {
     return compactionPolicyForTieredWindow;
   }
+
+  public boolean useSingleOutputForMinorCompaction() {
+    return singleOutputForMinorCompaction;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
index 3b24189..b7a788c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
@@ -45,7 +45,7 @@ public abstract class CompactionPolicy {
    * @param filesToCompact Files to compact. Can be null.
    * @return True if we should run a major compaction.
    */
-  public abstract boolean isMajorCompaction(
+  public abstract boolean shouldPerformMajorCompaction(
     final Collection<StoreFile> filesToCompact) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
index 268bb09..9e7f6a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -31,12 +37,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-
 /**
  * This class holds all logical details necessary to run a compaction.
  */
@@ -76,6 +76,10 @@ public class CompactionRequest implements 
Comparable<CompactionRequest> {
     recalculateSize();
   }
 
+  public void updateFiles(Collection<StoreFile> files) {
+    this.filesToCompact = files;
+  }
+
   /**
    * Called before compaction is executed by CompactSplitThread; for use by 
coproc subclasses.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index 9f65e6e..d61af42 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,11 +20,11 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
+import com.google.common.math.LongMath;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,9 +36,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -50,14 +55,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
  * 3. Improve TTL efficiency.
  * Perfect fit for the use cases that:
  * 1. has mostly date-based data write and scan and a focus on the most recent 
data.
- * 2. never or rarely deletes data. Out-of-order writes are handled 
gracefully. Time range
- * overlapping among store files is tolerated and the performance impact is 
minimized. Configuration
- * can be set at hbase-site or overriden at per-table or per-column-famly 
level by hbase shell.
- * Design spec is at
+ * Out-of-order writes are handled gracefully. Time range overlapping among 
store files is
+ * tolerated and the performance impact is minimized. Configuration can be set 
at hbase-site
+ * or overridden at per-table or per-column-family level by hbase shell. 
Design spec is at
  * 
https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy {
+public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
   private static final Log LOG = 
LogFactory.getLog(DateTieredCompactionPolicy.class);
 
   private RatioBasedCompactionPolicy compactionPolicyPerWindow;
@@ -67,111 +71,112 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
     super(conf, storeConfigInfo);
     try {
       compactionPolicyPerWindow =
-          
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
-            new Class[] { Configuration.class, StoreConfigInformation.class }, 
new Object[] { conf,
-              storeConfigInfo });
+        
ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
+          new Class[] { Configuration.class, StoreConfigInformation.class }, 
new Object[] { conf,
+            storeConfigInfo });
     } catch (Exception e) {
       throw new IOException("Unable to load configured compaction policy '"
           + comConf.getCompactionPolicyForTieredWindow() + "'", e);
     }
   }
 
-  @Override
-  public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) 
throws IOException {
-    // Never do major compaction unless forced
-    return false;
-  }
-
-  @Override
   /**
-   * Heuristics for guessing whether we need compaction.
+   * Heuristics for guessing whether we need minor compaction.
    */
-  public boolean needsCompaction(final Collection<StoreFile> storeFiles,
-      final List<StoreFile> filesCompacting) {
-    return needsCompaction(storeFiles, filesCompacting, 
EnvironmentEdgeManager.currentTime());
-  }
-
+  @Override
   @VisibleForTesting
   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
-      final List<StoreFile> filesCompacting, long now) {
-    if (!super.needsCompaction(storeFiles, filesCompacting)) {
-      return false;
-    }
-
+      final List<StoreFile> filesCompacting) {
     ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
-    candidates = filterBulk(candidates);
-    candidates = skipLargeFiles(candidates, true);
     try {
-      candidates = applyCompactionPolicy(candidates, true, false, now);
+      return selectMinorCompaction(candidates, false, true) != null;
     } catch (Exception e) {
       LOG.error("Can not check for compaction: ", e);
       return false;
     }
-
-    return candidates != null && candidates.size() >= 
comConf.getMinFilesToCompact();
   }
 
-  /**
-   * Could return null if no candidates are found
-   */
-  @Override
-  public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> 
candidates,
-      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
-    return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck,
-      EnvironmentEdgeManager.currentTime());
-  }
+  public boolean shouldPerformMajorCompaction(final Collection<StoreFile> 
filesToCompact)
+    throws IOException {
+    long mcTime = getNextMajorCompactTime(filesToCompact);
+    if (filesToCompact == null || mcTime == 0) {
+      return false;
+    }
 
-  /**
-   * Input candidates are sorted from oldest to newest by seqId. Could return 
null if no candidates
-   * are found.
-   */
-  @VisibleForTesting
-  public ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> 
candidates,
-      boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException {
-    Iterable<StoreFile> candidatesInWindow =
-      filterOldStoreFiles(Lists.newArrayList(candidates), 
comConf.getMaxStoreFileAgeMillis(), now);
-
-    List<ArrayList<StoreFile>> buckets =
-        partitionFilesToBuckets(candidatesInWindow, 
comConf.getBaseWindowMillis(),
-          comConf.getWindowsPerTier(), now);
-    LOG.debug("Compaction buckets are: " + buckets);
-    if (buckets.size() >= storeConfigInfo.getBlockingFileCount()) {
-      LOG.warn("Number of compaction buckets:" +  buckets.size()
-        + ", exceeds blocking file count setting: "
-        + storeConfigInfo.getBlockingFileCount()
-        + ", either increase hbase.hstore.blockingStoreFiles or "
-        + "reduce the number of tiered compaction windows");
+    // TODO: Use better method for determining stamp of last major (HBASE-2990)
+    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+    long now = EnvironmentEdgeManager.currentTime();
+    if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
+      return false;
     }
 
-    return newestBucket(buckets, comConf.getIncomingWindowMin(), now,
-      comConf.getBaseWindowMillis(), mayUseOffPeak);
-  }
+    long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+    HDFSBlocksDistribution hdfsBlocksDistribution = new 
HDFSBlocksDistribution();
+    long oldestToCompact = 
getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
+    List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, 
oldestToCompact, now);
+    boolean[] filesInWindow = new boolean[boundaries.size()];
 
-  /**
-   * @param buckets the list of buckets, sorted from newest to oldest, from 
which to return the
-   *          newest bucket within thresholds.
-   * @param incomingWindowThreshold minimum number of storeFiles in a bucket 
to qualify.
-   * @param maxThreshold maximum number of storeFiles to compact at once (the 
returned bucket will
-   *          be trimmed down to this).
-   * @return a bucket (a list of store files within a window to be compacted).
-   * @throws IOException error
-   */
-  private ArrayList<StoreFile> newestBucket(List<ArrayList<StoreFile>> buckets,
-      int incomingWindowThreshold, long now, long baseWindowMillis, boolean 
mayUseOffPeak)
-      throws IOException {
-    Window incomingWindow = getInitialWindow(now, baseWindowMillis);
-    for (ArrayList<StoreFile> bucket : buckets) {
-      int minThreshold =
-          
incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) <= 0 ? 
comConf
-              .getIncomingWindowMin() : comConf.getMinFilesToCompact();
-      compactionPolicyPerWindow.setMinThreshold(minThreshold);
-      ArrayList<StoreFile> candidates =
-          compactionPolicyPerWindow.applyCompactionPolicy(bucket, 
mayUseOffPeak, false);
-      if (candidates != null && !candidates.isEmpty()) {
-        return candidates;
+    for (StoreFile file: filesToCompact) {
+      Long minTimestamp = file.getMinimumTimestamp();
+      long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - 
minTimestamp.longValue();
+      if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) {
+        LOG.debug("Major compaction triggered on store " + this
+          + "; for TTL maintenance");
+        return true;
       }
+      if (!file.isMajorCompaction() || file.isBulkLoadResult()) {
+        LOG.debug("Major compaction triggered on store " + this
+          + ", because there are new files and time since last major 
compaction "
+          + (now - lowTimestamp) + "ms");
+        return true;
+      }
+
+      int lowerWindowIndex = Collections.binarySearch(boundaries,
+        minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp);
+      int upperWindowIndex = Collections.binarySearch(boundaries,
+        file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : 
file.getMaximumTimestamp());
+      if (lowerWindowIndex != upperWindowIndex) {
+        LOG.debug("Major compaction triggered on store " + this + "; because 
file "
+          + file.getPath() + " has data with timestamps cross window 
boundaries");
+        return true;
+      } else if (filesInWindow[upperWindowIndex]) {
+        LOG.debug("Major compaction triggered on store " + this +
+          "; because there are more than one file in some windows");
+        return true;
+      } else {
+        filesInWindow[upperWindowIndex] = true;
+      }
+      hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
     }
-    return null;
+
+    float blockLocalityIndex = hdfsBlocksDistribution
+        .getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));
+    if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
+      LOG.debug("Major compaction triggered on store " + this
+        + "; to make hdfs blocks local, current blockLocalityIndex is "
+        + blockLocalityIndex + " (min " + 
comConf.getMinLocalityToForceCompact() + ")");
+      return true;
+    }
+
+    LOG.debug("Skipping major compaction of " + this +
+      ", because the files are already major compacted");
+    return false;
+  }
+
+  @Override
+  protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> 
candidateSelection,
+    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws 
IOException {
+    CompactionRequest result = tryingMajor ? 
selectMajorCompaction(candidateSelection)
+      : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
+    LOG.debug("Generated compaction request: " + result);
+    return result;
+  }
+
+  public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> 
candidateSelection) {
+    long now = EnvironmentEdgeManager.currentTime();
+    long oldestToCompact = 
getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
+    return new DateTieredCompactionRequest(candidateSelection,
+      this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, 
now));
   }
 
   /**
@@ -179,63 +184,134 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
    * current file has a maxTimestamp older than last known maximum, treat this 
file as it carries
    * the last known maximum. This way both seqId and timestamp are in the same 
order. If files carry
    * the same maxTimestamps, they are ordered by seqId. We then reverse the 
list so they are ordered
-   * by seqId and maxTimestamp in decending order and build the time windows. 
All the out-of-order
+   * by seqId and maxTimestamp in descending order and build the time windows. 
All the out-of-order
    * data into the same compaction windows, guaranteeing contiguous compaction 
based on sequence id.
    */
-  private static List<ArrayList<StoreFile>> 
partitionFilesToBuckets(Iterable<StoreFile> storeFiles,
-      long baseWindowSizeMillis, int windowsPerTier, long now) {
-    List<ArrayList<StoreFile>> buckets = Lists.newArrayList();
-    Window window = getInitialWindow(now, baseWindowSizeMillis);
+  public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> 
candidateSelection,
+      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    long oldestToCompact = 
getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
+
+    // Make sure the store files is sorted by SeqId then maxTimestamp
+    List<StoreFile> storeFileList = 
Lists.newArrayList(filterOldStoreFiles(candidateSelection,
+      oldestToCompact));
+    Collections.sort(storeFileList, 
StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP);
 
     List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
-        Lists.newArrayListWithCapacity(Iterables.size(storeFiles));
+        Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
     long maxTimestampSeen = Long.MIN_VALUE;
-    for (StoreFile storeFile : storeFiles) {
+    for (StoreFile storeFile : storeFileList) {
       // if there is out-of-order data,
       // we put them in the same window as the last file in increasing order
-      maxTimestampSeen = Math.max(maxTimestampSeen, 
storeFile.getMaximumTimestamp());
+      maxTimestampSeen = Math.max(maxTimestampSeen,
+        storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : 
storeFile.getMaximumTimestamp());
       storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, 
maxTimestampSeen));
     }
-
     Collections.reverse(storefileMaxTimestampPairs);
+
+    Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
+    int minThreshold = comConf.getIncomingWindowMin();
     PeekingIterator<Pair<StoreFile, Long>> it =
         Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
-
     while (it.hasNext()) {
       int compResult = window.compareToTimestamp(it.peek().getSecond());
       if (compResult > 0) {
         // If the file is too old for the window, switch to the next window
-        window = window.nextWindow(windowsPerTier);
+        window = window.nextWindow(comConf.getWindowsPerTier(),
+          oldestToCompact);
+        minThreshold = comConf.getMinFilesToCompact();
       } else {
         // The file is within the target window
-        ArrayList<StoreFile> bucket = Lists.newArrayList();
-        // Add all files in the same window to current bucket. For incoming 
window
+        ArrayList<StoreFile> fileList = Lists.newArrayList();
+        // Add all files in the same window. For incoming window
         // we tolerate files with future data although it is sub-optimal
         while (it.hasNext() && 
window.compareToTimestamp(it.peek().getSecond()) <= 0) {
-          bucket.add(it.next().getFirst());
+          fileList.add(it.next().getFirst());
         }
-        if (!bucket.isEmpty()) {
-          buckets.add(bucket);
+        if (fileList.size() >= minThreshold) {
+          LOG.debug("Processing files: " + fileList + " for window: " + 
window);
+          DateTieredCompactionRequest request = 
generateCompactionRequest(fileList, window,
+            mayUseOffPeak, mayBeStuck, minThreshold);
+          if (request != null) {
+            return request;
+          }
         }
       }
     }
+    // A non-null file list is expected by HStore
+    return new CompactionRequest(Collections.<StoreFile> emptyList());
+  }
+
+  private DateTieredCompactionRequest 
generateCompactionRequest(ArrayList<StoreFile> storeFiles,
+      Window window, boolean mayUseOffPeak, boolean mayBeStuck, int 
minThreshold)
+    throws IOException {
+    // The files has to be in ascending order for ratio-based compaction to 
work right
+    // and removeExcessFile to exclude youngest files.
+    Collections.reverse(storeFiles);
+
+    // Compact everything in the window if have more files than 
comConf.maxBlockingFiles
+    compactionPolicyPerWindow.setMinThreshold(minThreshold);
+    ArrayList<StoreFile> storeFileSelection = mayBeStuck ? storeFiles
+      : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, 
mayUseOffPeak, false);
+    if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
+      // If there is any file in the window excluded from compaction,
+      // only one file will be output from compaction.
+      boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
+        comConf.useSingleOutputForMinorCompaction();
+      List<Long> boundaries = getCompactionBoundariesForMinor(window, 
singleOutput);
+      DateTieredCompactionRequest result = new 
DateTieredCompactionRequest(storeFileSelection,
+        boundaries);
+      return result;
+    }
+    return null;
+  }
+
+  /**
+   * Return a list of boundaries for multiple compaction output
+   *   in ascending order.
+   */
+  private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> 
filesToCompact,
+    long oldestToCompact, long now) {
+    long minTimestamp = Long.MAX_VALUE;
+    for (StoreFile file : filesToCompact) {
+      minTimestamp = Math.min(minTimestamp,
+        file.getMinimumTimestamp() == null? Long.MAX_VALUE : 
file.getMinimumTimestamp());
+    }
 
-    return buckets;
+    List<Long> boundaries = new ArrayList<Long>();
+
+    // Add startMillis of all windows between now and min timestamp
+    for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
+      window.compareToTimestamp(minTimestamp) > 0;
+      window = window.nextWindow(comConf.getWindowsPerTier(), 
oldestToCompact)) {
+      boundaries.add(window.startMillis());
+    }
+    boundaries.add(Long.MIN_VALUE);
+    Collections.reverse(boundaries);
+    return boundaries;
+  }
+
+  /**
+   * @return a list of boundaries for multiple compaction output
+   *   from minTimestamp to maxTimestamp.
+   */
+  private static List<Long> getCompactionBoundariesForMinor(Window window, 
boolean singleOutput) {
+    List<Long> boundaries = new ArrayList<Long>();
+    boundaries.add(Long.MIN_VALUE);
+    if (!singleOutput) {
+      boundaries.add(window.startMillis());
+    }
+    return boundaries;
   }
 
   /**
    * Removes all store files with max timestamp older than (current - maxAge).
    * @param storeFiles all store files to consider
    * @param maxAge the age in milliseconds when a store file stops 
participating in compaction.
-   * @param now current time. store files with max timestamp less than (now - 
maxAge) are filtered.
    * @return a list of storeFiles with the store file older than maxAge 
excluded
    */
-  private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> 
storeFiles, long maxAge,
-      long now) {
-    if (maxAge == 0) {
-      return ImmutableList.of();
-    }
-    final long cutoff = now - maxAge;
+  private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> 
storeFiles,
+    final long cutoff) {
     return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
       @Override
       public boolean apply(StoreFile storeFile) {
@@ -243,13 +319,24 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
         if (storeFile == null) {
           return false;
         }
-        return storeFile.getMaximumTimestamp() >= cutoff;
+        Long maxTimestamp = storeFile.getMaximumTimestamp();
+        return maxTimestamp == null ? true : maxTimestamp >= cutoff;
       }
     });
   }
 
-  private static Window getInitialWindow(long now, long timeUnit) {
-    return new Window(timeUnit, now / timeUnit);
+  private static Window getIncomingWindow(long now, long baseWindowMillis) {
+    return new Window(baseWindowMillis, now / baseWindowMillis);
+  }
+
+  private static long getOldestToCompact(long maxAgeMillis, long now) {
+    try {
+      return LongMath.checkedSubtract(now, maxAgeMillis);
+    } catch (ArithmeticException ae) {
+      LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": 
" + maxAgeMillis
+        + ". All the files will be eligible for minor compaction.");
+      return Long.MIN_VALUE;
+    }
   }
 
   /**
@@ -268,7 +355,7 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
     private final long divPosition;
 
     private Window(long baseWindowMillis, long divPosition) {
-      this.windowMillis = baseWindowMillis;
+      windowMillis = baseWindowMillis;
       this.divPosition = divPosition;
     }
 
@@ -279,6 +366,13 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
      *         or after than the timestamp.
      */
     public int compareToTimestamp(long timestamp) {
+      if (timestamp < 0) {
+        try {
+          timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
+        } catch (ArithmeticException ae) {
+          timestamp = Long.MIN_VALUE;
+        }
+      }
       long pos = timestamp / windowMillis;
       return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
     }
@@ -290,12 +384,42 @@ public class DateTieredCompactionPolicy extends 
RatioBasedCompactionPolicy {
      *          following those will be <code>tierBase</code> times as big.
      * @return The next window
      */
-    public Window nextWindow(int windowsPerTier) {
-      if (divPosition % windowsPerTier > 0) {
+    public Window nextWindow(int windowsPerTier, long oldestToCompact) {
+      // Don't promote to the next tier if there is not even 1 window at 
current tier
+      // or if the next window crosses the max age.
+      if (divPosition % windowsPerTier > 0 ||
+          startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
         return new Window(windowMillis, divPosition - 1);
       } else {
         return new Window(windowMillis * windowsPerTier, divPosition / 
windowsPerTier - 1);
       }
     }
+
+    /**
+     * Inclusive lower bound
+     */
+    public long startMillis() {
+      try {
+        return LongMath.checkedMultiply(windowMillis, divPosition);
+      } catch (ArithmeticException ae) {
+        return Long.MIN_VALUE;
+      }
+    }
+
+    /**
+     * Exclusive upper bound
+     */
+    public long endMillis() {
+      try {
+        return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
+      } catch (ArithmeticException ae) {
+        return Long.MAX_VALUE;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "[" + startMillis() + ", " + endMillis() + ")";
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
new file mode 100644
index 0000000..b33663f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
+  justification="It is intended to use the same equal method as superclass")
+public class DateTieredCompactionRequest extends CompactionRequest {
+  private List<Long> boundaries;
+
+  public DateTieredCompactionRequest(Collection<StoreFile> files, List<Long> 
boundaryList) {
+    super(files);
+    boundaries = boundaryList;
+  }
+
+  public List<Long> getBoundaries() {
+    return boundaries;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + " boundaries=" + 
Arrays.toString(boundaries.toArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
index c9d911d..f0cb5d2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
@@ -25,8 +25,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 
@@ -51,7 +51,7 @@ public class ExploringCompactionPolicy extends 
RatioBasedCompactionPolicy {
   }
 
   @Override
-  final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> 
candidates,
+  protected final ArrayList<StoreFile> applyCompactionPolicy(final 
ArrayList<StoreFile> candidates,
     final boolean mayUseOffPeak, final boolean mightBeStuck) throws 
IOException {
     return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, 
mightBeStuck,
         mayUseOffPeak, comConf.getMinFilesToCompact(), 
comConf.getMaxFilesToCompact()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
index eace81f..d339898 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
@@ -76,11 +76,12 @@ public class FIFOCompactionPolicy extends 
ExploringCompactionPolicy {
   }
 
   @Override
-  public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) 
throws IOException {
+  public boolean shouldPerformMajorCompaction(Collection<StoreFile> 
filesToCompact)
+    throws IOException {
     boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
     if(isAfterSplit){
       LOG.info("Split detected, delegate to the parent policy.");
-      return super.isMajorCompaction(filesToCompact);
+      return super.shouldPerformMajorCompaction(filesToCompact);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
index 4533a9c..5600a4e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
@@ -16,14 +16,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,17 +33,13 @@ import 
org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-
 /**
  * The default algorithm for selecting files for compaction.
  * Combines the compaction configuration and the provisional file selection 
that
  * it's given to produce the list of suitable candidates for compaction.
  */
 @InterfaceAudience.Private
-public class RatioBasedCompactionPolicy extends CompactionPolicy {
+public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
   private static final Log LOG = 
LogFactory.getLog(RatioBasedCompactionPolicy.class);
 
   public RatioBasedCompactionPolicy(Configuration conf,
@@ -53,154 +47,73 @@ public class RatioBasedCompactionPolicy extends 
CompactionPolicy {
     super(conf, storeConfigInfo);
   }
 
-  private ArrayList<StoreFile> getCurrentEligibleFiles(
-      ArrayList<StoreFile> candidateFiles, final List<StoreFile> 
filesCompacting) {
-    // candidates = all storefiles not already in compaction queue
-    if (!filesCompacting.isEmpty()) {
-      // exclude all files older than the newest file we're currently
-      // compacting. this allows us to preserve contiguity (HBASE-2856)
-      StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
-      int idx = candidateFiles.indexOf(last);
-      Preconditions.checkArgument(idx != -1);
-      candidateFiles.subList(0, idx + 1).clear();
-    }
-    return candidateFiles;
-  }
-
-  public List<StoreFile> preSelectCompactionForCoprocessor(
-      final Collection<StoreFile> candidates, final List<StoreFile> 
filesCompacting) {
-    return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), 
filesCompacting);
-  }
-
-  /**
-   * @param candidateFiles candidate files, ordered from oldest to newest by 
seqId. We rely on
-   *   DefaultStoreFileManager to sort the files by seqId to guarantee 
contiguous compaction based
-   *   on seqId for data consistency.
-   * @return subset copy of candidate list that meets compaction criteria
-   * @throws java.io.IOException
-   */
-  public CompactionRequest selectCompaction(Collection<StoreFile> 
candidateFiles,
-      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
-      final boolean mayUseOffPeak, final boolean forceMajor) throws 
IOException {
-    // Preliminary compaction subject to filters
-    ArrayList<StoreFile> candidateSelection = new 
ArrayList<StoreFile>(candidateFiles);
-    // Stuck and not compacting enough (estimate). It is not guaranteed that 
we will be
-    // able to compact more if stuck and compacting, because ratio policy 
excludes some
-    // non-compacting files from consideration during compaction (see 
getCurrentEligibleFiles).
-    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
-    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + 
futureFiles)
-        >= storeConfigInfo.getBlockingFileCount();
-    candidateSelection = getCurrentEligibleFiles(candidateSelection, 
filesCompacting);
-    LOG.debug("Selecting compaction from " + candidateFiles.size() + " store 
files, " +
-        filesCompacting.size() + " compacting, " + candidateSelection.size() +
-        " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
-
-    // If we can't have all files, we cannot do major anyway
-    boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
-    if (!(forceMajor && isAllFiles)) {
-      candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
-      isAllFiles = candidateFiles.size() == candidateSelection.size();
-    }
-
-    // Try a major compaction if this is a user-requested major compaction,
-    // or if we do not have too many files to compact and this was requested 
as a major compaction
-    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
-        || (((forceMajor && isAllFiles) || 
isMajorCompaction(candidateSelection))
-          && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
-    // Or, if there are any references among the candidates.
-    boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
-    if (!isTryingMajor && !isAfterSplit) {
-      // We're are not compacting all files, let's see what files are 
applicable
-      candidateSelection = filterBulk(candidateSelection);
-      candidateSelection = applyCompactionPolicy(candidateSelection, 
mayUseOffPeak, mayBeStuck);
-      candidateSelection = checkMinFilesCriteria(candidateSelection);
-    }
-    candidateSelection = removeExcessFiles(candidateSelection, 
isUserCompaction, isTryingMajor);
-    // Now we have the final file list, so we can determine if we can do 
major/all files.
-    isAllFiles = (candidateFiles.size() == candidateSelection.size());
-    CompactionRequest result = new CompactionRequest(candidateSelection);
-    result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && 
mayUseOffPeak);
-    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
-    return result;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all files above maxCompactSize
-   * Also save all references. We MUST compact them
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
    */
-  protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> 
candidates,
-    boolean mayUseOffpeak) {
-    int pos = 0;
-    while (pos < candidates.size() && !candidates.get(pos).isReference()
-      && (candidates.get(pos).getReader().length() > 
comConf.getMaxCompactSize(mayUseOffpeak))) {
-      ++pos;
-    }
-    if (pos > 0) {
-      LOG.debug("Some files are too large. Excluding " + pos
-          + " files from compaction candidates");
-      candidates.subList(0, pos).clear();
+  @Override
+  public boolean shouldPerformMajorCompaction(final Collection<StoreFile> 
filesToCompact)
+    throws IOException {
+    boolean result = false;
+    long mcTime = getNextMajorCompactTime(filesToCompact);
+    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+      return result;
     }
-    return candidates;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all bulk load files if configured
-   */
-  protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
-    candidates.removeAll(Collections2.filter(candidates,
-        new Predicate<StoreFile>() {
-          @Override
-          public boolean apply(StoreFile input) {
-            return input.excludeFromMinorCompaction();
+    // TODO: Use better method for determining stamp of last major (HBASE-2990)
+    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
+      // Major compaction time has elapsed.
+      long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+      if (filesToCompact.size() == 1) {
+        // Single file
+        StoreFile sf = filesToCompact.iterator().next();
+        Long minTimestamp = sf.getMinimumTimestamp();
+        long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - 
minTimestamp.longValue();
+        if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < 
cfTTL)) {
+          float blockLocalityIndex =
+            sf.getHDFSBlockDistribution().getBlockLocalityIndex(
+            RSRpcServices.getHostname(comConf.conf, false));
+          if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
+            LOG.debug("Major compaction triggered on only store " + this
+              + "; to make hdfs blocks local, current blockLocalityIndex is "
+              + blockLocalityIndex + " (min " + 
comConf.getMinLocalityToForceCompact() + ")");
+            result = true;
+          } else {
+            LOG.debug("Skipping major compaction of " + this
+              + " because one (major) compacted file only, oldestTime " + 
oldest
+              + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + 
blockLocalityIndex
+              + " (min " + comConf.getMinLocalityToForceCompact() + ")");
           }
-        }));
-    return candidates;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * take upto maxFilesToCompact from the start
-   */
-  private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> 
candidates,
-      boolean isUserCompaction, boolean isMajorCompaction) {
-    int excess = candidates.size() - comConf.getMaxFilesToCompact();
-    if (excess > 0) {
-      if (isMajorCompaction && isUserCompaction) {
-        LOG.debug("Warning, compacting more than " + 
comConf.getMaxFilesToCompact() +
-            " files because of a user-requested major compaction");
+        } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
+          LOG.debug("Major compaction triggered on store " + this
+            + ", because keyvalues outdated; time since last major compaction "
+            + (now - lowTimestamp) + "ms");
+          result = true;
+        }
       } else {
-        LOG.debug("Too many admissible files. Excluding " + excess
-          + " files from compaction candidates");
-        candidates.subList(comConf.getMaxFilesToCompact(), 
candidates.size()).clear();
+        LOG.debug("Major compaction triggered on store " + this
+          + "; time since last major compaction " + (now - lowTimestamp) + 
"ms");
       }
+      result = true;
     }
-    return candidates;
+    return result;
   }
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * forget the compactionSelection if we don't have enough files
-   */
-  protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> 
candidates) {
-    int minFiles = comConf.getMinFilesToCompact();
-    if (candidates.size() < minFiles) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting files because we only have " + 
candidates.size() +
-          " files ready for compaction. Need " + minFiles + " to initiate.");
-      }
-      candidates.clear();
+
+  @Override
+  protected CompactionRequest createCompactionRequest(ArrayList<StoreFile>
+    candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean 
mayBeStuck)
+    throws IOException {
+    if (!tryingMajor) {
+      candidateSelection = filterBulk(candidateSelection);
+      candidateSelection = applyCompactionPolicy(candidateSelection, 
mayUseOffPeak, mayBeStuck);
+      candidateSelection = checkMinFilesCriteria(candidateSelection,
+        comConf.getMinFilesToCompact());
     }
-    return candidates;
+    return new CompactionRequest(candidateSelection);
   }
 
   /**
-    * @param candidates pre-filtrate
-    * @return filtered subset
     * -- Default minor compaction selection algorithm:
     * choose CompactSelection from candidates --
     * First exclude bulk-load files if indicated in configuration.
@@ -227,9 +140,11 @@ public class RatioBasedCompactionPolicy extends 
CompactionPolicy {
     *    | |  | |  | |  | |  _  | |
     *    | |  | |  | |  | | | | | |
     *    | |  | |  | |  | | | | | |
+    * @param candidates pre-filtrate
+    * @return filtered subset
     */
-  ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
-      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
+  protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> 
candidates,
+    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
     if (candidates.isEmpty()) {
       return candidates;
     }
@@ -276,114 +191,12 @@ public class RatioBasedCompactionPolicy extends 
CompactionPolicy {
     return candidates;
   }
 
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
-   */
-  @Override
-  public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
-      throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime(filesToCompact);
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      long cfTtl = this.storeConfigInfo.getStoreFileTtl();
-      if (filesToCompact.size() == 1) {
-        // Single file
-        StoreFile sf = filesToCompact.iterator().next();
-        Long minTimestamp = sf.getMinimumTimestamp();
-        long oldest = (minTimestamp == null)
-            ? Long.MIN_VALUE
-            : now - minTimestamp.longValue();
-        if (sf.isMajorCompaction() &&
-            (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
-          float blockLocalityIndex = 
sf.getHDFSBlockDistribution().getBlockLocalityIndex(
-              RSRpcServices.getHostname(comConf.conf, false)
-          );
-          if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Major compaction triggered on only store " + this +
-                  "; to make hdfs blocks local, current blockLocalityIndex is 
" +
-                  blockLocalityIndex + " (min " + 
comConf.getMinLocalityToForceCompact() +
-                  ")");
-            }
-            result = true;
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skipping major compaction of " + this +
-                  " because one (major) compacted file only, oldestTime " +
-                  oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex 
is " +
-                  blockLocalityIndex + " (min " + 
comConf.getMinLocalityToForceCompact() +
-                  ")");
-            }
-          }
-        } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
-          LOG.debug("Major compaction triggered on store " + this +
-            ", because keyvalues outdated; time since last major compaction " +
-            (now - lowTimestamp) + "ms");
-          result = true;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
-              "; time since last major compaction " + (now - lowTimestamp) + 
"ms");
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
   /**
-   * Used calculation jitter
+   * A heuristic method to decide whether to schedule a compaction request
+   * @param storeFiles files in the store.
+   * @param filesCompacting files being scheduled to compact.
+   * @return true to schedule a request.
    */
-  private final Random random = new Random();
-
-  /**
-   * @param filesToCompact
-   * @return When to run next major compaction
-   */
-  public long getNextMajorCompactTime(final Collection<StoreFile> 
filesToCompact) {
-    // default = 24hrs
-    long ret = comConf.getMajorCompactionPeriod();
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct = comConf.getMajorCompactionJitter();
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
-        if (seed != null) {
-          // Synchronized to ensure one user of random instance at a time.
-          double rnd = -1;
-          synchronized (this) {
-            this.random.setSeed(seed);
-            rnd = this.random.nextDouble();
-          }
-          ret += jitter - Math.round(2L * jitter * rnd);
-        } else {
-          ret = 0; // If seed is null, then no storefiles == no major 
compaction
-        }
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * @param compactionSize Total size of some compaction
-   * @return whether this should be a large or small compaction
-   */
-  @Override
-  public boolean throttleCompaction(long compactionSize) {
-    return compactionSize > comConf.getThrottlePoint();
-  }
-
   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
       final List<StoreFile> filesCompacting) {
     int numCandidates = storeFiles.size() - filesCompacting.size();
@@ -392,7 +205,6 @@ public class RatioBasedCompactionPolicy extends 
CompactionPolicy {
 
   /**
    * Overwrite min threshold for compaction
-   * @param minThreshold min to update to
    */
   public void setMinThreshold(int minThreshold) {
     comConf.setMinFilesToCompact(minThreshold);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
new file mode 100644
index 0000000..77b0af8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+
+/**
+ * An abstract compaction policy that select files on seq id order.
+ */
+@InterfaceAudience.Private
+public abstract class SortedCompactionPolicy extends CompactionPolicy {
+
+  private static final Log LOG = 
LogFactory.getLog(SortedCompactionPolicy.class);
+
+  public SortedCompactionPolicy(Configuration conf, StoreConfigInformation 
storeConfigInfo) {
+    super(conf, storeConfigInfo);
+  }
+
+  public List<StoreFile> preSelectCompactionForCoprocessor(final 
Collection<StoreFile> candidates,
+      final List<StoreFile> filesCompacting) {
+    return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), 
filesCompacting);
+  }
+
+  /**
+   * @param candidateFiles candidate files, ordered from oldest to newest by 
seqId. We rely on
+   *   DefaultStoreFileManager to sort the files by seqId to guarantee 
contiguous compaction based
+   *   on seqId for data consistency.
+   * @return subset copy of candidate list that meets compaction criteria
+   */
+  public CompactionRequest selectCompaction(Collection<StoreFile> 
candidateFiles,
+      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
+      final boolean mayUseOffPeak, final boolean forceMajor) throws 
IOException {
+    // Preliminary compaction subject to filters
+    ArrayList<StoreFile> candidateSelection = new 
ArrayList<StoreFile>(candidateFiles);
+    // Stuck and not compacting enough (estimate). It is not guaranteed that 
we will be
+    // able to compact more if stuck and compacting, because ratio policy 
excludes some
+    // non-compacting files from consideration during compaction (see 
getCurrentEligibleFiles).
+    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
+    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + 
futureFiles)
+        >= storeConfigInfo.getBlockingFileCount();
+
+    candidateSelection = getCurrentEligibleFiles(candidateSelection, 
filesCompacting);
+    LOG.debug("Selecting compaction from " + candidateFiles.size() + " store 
files, " +
+        filesCompacting.size() + " compacting, " + candidateSelection.size() +
+        " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
+
+    // If we can't have all files, we cannot do major anyway
+    boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
+    if (!(forceMajor && isAllFiles)) {
+      candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
+      isAllFiles = candidateFiles.size() == candidateSelection.size();
+    }
+
+    // Try a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested 
as a major compaction
+    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
+        || (((forceMajor && isAllFiles) || 
shouldPerformMajorCompaction(candidateSelection))
+          && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
+    // Or, if there are any references among the candidates.
+    boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
+
+    CompactionRequest result = createCompactionRequest(candidateSelection,
+      isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
+
+    ArrayList<StoreFile> filesToCompact = 
Lists.newArrayList(result.getFiles());
+    removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
+    result.updateFiles(filesToCompact);
+
+    isAllFiles = (candidateFiles.size() == filesToCompact.size());
+    result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && 
mayUseOffPeak);
+    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
+
+    return result;
+  }
+
+  protected abstract CompactionRequest 
createCompactionRequest(ArrayList<StoreFile>
+    candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean 
mayBeStuck)
+    throws IOException;
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  public abstract boolean shouldPerformMajorCompaction(final 
Collection<StoreFile> filesToCompact)
+    throws IOException;
+
+  /**
+   * Used calculation jitter
+   */
+  private final Random random = new Random();
+
+  /**
+   * @param filesToCompact
+   * @return When to run next major compaction
+   */
+  public long getNextMajorCompactTime(final Collection<StoreFile> 
filesToCompact) {
+    // default = 24hrs
+    long ret = comConf.getMajorCompactionPeriod();
+    if (ret > 0) {
+      // default = 20% = +/- 4.8 hrs
+      double jitterPct = comConf.getMajorCompactionJitter();
+      if (jitterPct > 0) {
+        long jitter = Math.round(ret * jitterPct);
+        // deterministic jitter avoids a major compaction storm on restart
+        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
+        if (seed != null) {
+          // Synchronized to ensure one user of random instance at a time.
+          double rnd = -1;
+          synchronized (this) {
+            this.random.setSeed(seed);
+            rnd = this.random.nextDouble();
+          }
+          ret += jitter - Math.round(2L * jitter * rnd);
+        } else {
+          ret = 0; // If seed is null, then no storefiles == no major 
compaction
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
+   */
+  public boolean throttleCompaction(long compactionSize) {
+    return compactionSize > comConf.getThrottlePoint();
+  }
+
+  public abstract boolean needsCompaction(final Collection<StoreFile> 
storeFiles,
+    final List<StoreFile> filesCompacting);
+
+  protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> 
candidateFiles,
+      final List<StoreFile> filesCompacting) {
+    // candidates = all storefiles not already in compaction queue
+    if (!filesCompacting.isEmpty()) {
+      // exclude all files older than the newest file we're currently
+      // compacting. this allows us to preserve contiguity (HBASE-2856)
+      StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+      int idx = candidateFiles.indexOf(last);
+      Preconditions.checkArgument(idx != -1);
+      candidateFiles.subList(0, idx + 1).clear();
+    }
+    return candidateFiles;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset exclude all files above maxCompactSize
+   *   Also save all references. We MUST compact them
+   */
+  protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> 
candidates,
+    boolean mayUseOffpeak) {
+    int pos = 0;
+    while (pos < candidates.size() && !candidates.get(pos).isReference()
+      && (candidates.get(pos).getReader().length() > 
comConf.getMaxCompactSize(mayUseOffpeak))) {
+      ++pos;
+    }
+    if (pos > 0) {
+      LOG.debug("Some files are too large. Excluding " + pos
+          + " files from compaction candidates");
+      candidates.subList(0, pos).clear();
+    }
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset exclude all bulk load files if configured
+   */
+  protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
+    candidates.removeAll(Collections2.filter(candidates, new 
Predicate<StoreFile>() {
+      @Override
+      public boolean apply(StoreFile input) {
+        return input.excludeFromMinorCompaction();
+      }
+    }));
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   */
+  protected void removeExcessFiles(ArrayList<StoreFile> candidates,
+      boolean isUserCompaction, boolean isMajorCompaction) {
+    int excess = candidates.size() - comConf.getMaxFilesToCompact();
+    if (excess > 0) {
+      if (isMajorCompaction && isUserCompaction) {
+        LOG.debug("Warning, compacting more than " + 
comConf.getMaxFilesToCompact()
+            + " files because of a user-requested major compaction");
+      } else {
+        LOG.debug("Too many admissible files. Excluding " + excess
+            + " files from compaction candidates");
+        candidates.subList(comConf.getMaxFilesToCompact(), 
candidates.size()).clear();
+      }
+    }
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset forget the compactionSelection if we don't have 
enough files
+   */
+  protected ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> 
candidates,
+    int minFiles) {
+    if (candidates.size() < minFiles) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not compacting files because we only have " + 
candidates.size()
+            + " files ready for compaction. Need " + minFiles + " to 
initiate.");
+      }
+      candidates.clear();
+    }
+    return candidates;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index e8a4340..ff1dd8e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -166,7 +166,8 @@ public class StripeCompactionPolicy extends 
CompactionPolicy {
   }
 
   @Override
-  public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) 
throws IOException {
+  public boolean shouldPerformMajorCompaction(Collection<StoreFile> 
filesToCompact)
+    throws IOException {
     return false; // there's never a major compaction!
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f60fc9d1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index df039e7..32dc227 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -25,8 +25,11 @@ import java.util.TreeMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /** A mock used so our tests don't deal with actual StoreFiles */
 public class MockStoreFile extends StoreFile {
@@ -38,6 +41,9 @@ public class MockStoreFile extends StoreFile {
   byte[] splitPoint = null;
   TimeRangeTracker timeRangeTracker;
   long entryCount;
+  boolean isMajor;
+  HDFSBlocksDistribution hdfsBlocksDistribution;
+  long modificationTime;
 
   MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
       long length, long ageInDisk, boolean isRef, long sequenceid) throws 
IOException {
@@ -47,6 +53,11 @@ public class MockStoreFile extends StoreFile {
     this.isRef = isRef;
     this.ageInDisk = ageInDisk;
     this.sequenceid = sequenceid;
+    this.isMajor = false;
+    hdfsBlocksDistribution = new HDFSBlocksDistribution();
+    hdfsBlocksDistribution.addHostsAndBlockWeight(
+      new String[] { RSRpcServices.getHostname(testUtil.getConfiguration(), 
false) }, 1);
+    modificationTime = EnvironmentEdgeManager.currentTime();
   }
 
   void setLength(long newLen) {
@@ -65,21 +76,20 @@ public class MockStoreFile extends StoreFile {
 
   @Override
   public boolean isMajorCompaction() {
-    return false;
+    return isMajor;
   }
 
-  @Override
-  public boolean isReference() {
-    return this.isRef;
+  public void setIsMajor(boolean isMajor) {
+    this.isMajor = isMajor;
   }
 
   @Override
-  boolean isBulkLoadResult() {
-    return false;
+  public boolean isReference() {
+    return this.isRef;
   }
 
   @Override
-  public boolean isCompactedAway() {
+  public boolean isBulkLoadResult() {
     return false;
   }
 
@@ -102,14 +112,22 @@ public class MockStoreFile extends StoreFile {
 
   public Long getMinimumTimestamp() {
     return (timeRangeTracker == null) ?
-        null :
-        timeRangeTracker.getMinimumTimestamp();
+      null : timeRangeTracker.getMinimumTimestamp();
   }
 
   public Long getMaximumTimestamp() {
     return (timeRangeTracker == null) ?
-        null :
-        timeRangeTracker.getMaximumTimestamp();
+      null : timeRangeTracker.getMaximumTimestamp();
+  }
+
+  @Override
+  public long getModificationTimeStamp() {
+    return modificationTime;
+  }
+
+  @Override
+  public HDFSBlocksDistribution getHDFSBlockDistribution() {
+    return hdfsBlocksDistribution;
   }
 
   @Override

Reply via email to