HBASE-15389 Write out multiple files when compaction

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b902c9e4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b902c9e4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b902c9e4

Branch: refs/heads/0.98
Commit: b902c9e4d2dacc99bdb7b3327aebf8a5a1aa2211
Parents: f39c530
Author: zhangduo <[email protected]>
Authored: Fri Mar 25 16:00:32 2016 +0800
Committer: zhangduo <[email protected]>
Committed: Fri Mar 25 16:00:32 2016 +0800

----------------------------------------------------------------------
 .../regionserver/AbstractMultiFileWriter.java   | 119 +++++++
 .../regionserver/DateTieredMultiFileWriter.java |  83 +++++
 .../hadoop/hbase/regionserver/HStore.java       |   3 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   6 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |   7 +
 .../regionserver/StripeMultiFileWriter.java     | 206 +++++-------
 .../hbase/regionserver/StripeStoreFlusher.java  |  34 +-
 .../AbstractMultiOutputCompactor.java           | 154 +++++++++
 .../regionserver/compactions/Compactor.java     |   4 +-
 .../compactions/DateTieredCompactor.java        |  84 +++++
 .../compactions/DefaultCompactor.java           |   4 +-
 .../compactions/StripeCompactionPolicy.java     |  13 +-
 .../compactions/StripeCompactor.java            | 153 +++------
 .../hbase/regionserver/TestStripeCompactor.java | 311 -------------------
 .../regionserver/compactions/TestCompactor.java | 211 +++++++++++++
 .../compactions/TestDateTieredCompactor.java    | 168 ++++++++++
 .../compactions/TestStripeCompactionPolicy.java |  24 +-
 .../compactions/TestStripeCompactor.java        | 206 ++++++++++++
 18 files changed, 1222 insertions(+), 568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
new file mode 100644
index 0000000..15e3295
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
+
+/**
+ * Base class for cell sink that separates the provided cells into multiple 
files.
+ */
[email protected]
+public abstract class AbstractMultiFileWriter implements CellSink {
+
+  private static final Log LOG = 
LogFactory.getLog(AbstractMultiFileWriter.class);
+
+  /** Factory that is used to produce single StoreFile.Writer-s */
+  protected WriterFactory writerFactory;
+
+  /** Source scanner that is tracking KV count; may be null if source is not 
StoreScanner */
+  protected StoreScanner sourceScanner;
+
+  public interface WriterFactory {
+    public StoreFile.Writer createWriter() throws IOException;
+  }
+
+  /**
+   * Initializes multi-writer before usage.
+   * @param sourceScanner Optional store scanner to obtain the information 
about read progress.
+   * @param factory Factory used to produce individual file writers.
+   */
+  public void init(StoreScanner sourceScanner, WriterFactory factory) {
+    this.writerFactory = factory;
+    this.sourceScanner = sourceScanner;
+  }
+
+  /**
+   * Commit all writers.
+   * <p>
+   * Notice that here we use the same <code>maxSeqId</code> for all output 
files since we haven't
+   * find an easy to find enough sequence ids for different output files in 
some corner cases. See
+   * comments in HBASE-15400 for more details.
+   */
+  public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws 
IOException {
+    preCommitWriters();
+    Collection<StoreFile.Writer> writers = this.writers();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Commit " + writers.size() + " writers");
+    }
+    List<Path> paths = new ArrayList<Path>();
+    for (Writer writer : writers) {
+      if (writer == null) {
+        continue;
+      }
+      writer.appendMetadata(maxSeqId, isMajor);
+      preCloseWriter(writer);
+      paths.add(writer.getPath());
+      writer.close();
+    }
+    return paths;
+  }
+
+  /**
+   * Close all writers without throwing any exceptions. This is used when 
compaction failed usually.
+   */
+  public List<Path> abortWriters() {
+    List<Path> paths = new ArrayList<Path>();
+    for (StoreFile.Writer writer : writers()) {
+      try {
+        if (writer != null) {
+          paths.add(writer.getPath());
+          writer.close();
+        }
+      } catch (Exception ex) {
+        LOG.error("Failed to close the writer after an unfinished 
compaction.", ex);
+      }
+    }
+    return paths;
+  }
+
+  protected abstract Collection<StoreFile.Writer> writers();
+
+  /**
+   * Subclasses override this method to be called at the end of a successful 
sequence of append; all
+   * appends are processed before this method is called.
+   */
+  protected void preCommitWriters() throws IOException {
+  }
+
+  /**
+   * Subclasses override this method to be called before we close the give 
writer. Usually you can
+   * append extra metadata to the writer.
+   */
+  protected void preCloseWriter(StoreFile.Writer writer) throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
new file mode 100644
index 0000000..b688915
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+
+/**
+ * class for cell sink that separates the provided cells into multiple files 
for date tiered
+ * compaction.
+ */
[email protected]
+public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
+
+  private final NavigableMap<Long, StoreFile.Writer> lowerBoundary2Writer
+    = new TreeMap<Long, StoreFile.Writer>();
+
+  private final boolean needEmptyFile;
+
+  /**
+   * @param needEmptyFile whether need to create an empty store file if we 
haven't written out
+   *          anything.
+   */
+  public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean 
needEmptyFile) {
+    for (Long boundary : lowerBoundaries) {
+      lowerBoundary2Writer.put(boundary, null);
+    }
+    this.needEmptyFile = needEmptyFile;
+  }
+
+  @Override
+  public void append(KeyValue kv) throws IOException {
+    Map.Entry<Long, StoreFile.Writer> entry = 
lowerBoundary2Writer.floorEntry(kv.getTimestamp());
+    StoreFile.Writer writer = entry.getValue();
+    if (writer == null) {
+      writer = writerFactory.createWriter();
+      lowerBoundary2Writer.put(entry.getKey(), writer);
+    }
+    writer.append(kv);
+  }
+
+  @Override
+  protected Collection<Writer> writers() {
+    return lowerBoundary2Writer.values();
+  }
+
+  @Override
+  protected void preCommitWriters() throws IOException {
+    if (!needEmptyFile) {
+      return;
+    }
+    for (StoreFile.Writer writer : lowerBoundary2Writer.values()) {
+      if (writer != null) {
+        return;
+      }
+    }
+    // we haven't written out any data, create an empty file to retain metadata
+    lowerBoundary2Writer.put(lowerBoundary2Writer.firstKey(), 
writerFactory.createWriter());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9c9c1da..ee0e708 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -440,7 +440,8 @@ public class HStore implements Store {
   /**
    * @return The maximum sequence id in all store files. Used for log replay.
    */
-  long getMaxSequenceId() {
+  @Override
+  public long getMaxSequenceId() {
     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index a110ad3..1f94b49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -303,6 +302,11 @@ public interface Store extends HeapSize, 
StoreConfigInformation {
   HColumnDescriptor getFamily();
 
   /**
+   * @return The maximum sequence id in all store files.
+   */
+  long getMaxSequenceId();
+
+  /**
    * @return The maximum memstoreTS in all store files.
    */
   long getMaxMemstoreTS();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index f222379..25e2e78 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -231,6 +231,13 @@ public class StoreFile {
   }
 
   /**
+   * Clone a StoreFile for opening private reader.
+   */
+  public StoreFile cloneForReader() {
+    return new StoreFile(this);
+  }
+
+  /**
    * @return the StoreFile object associated to this StoreFile.
    *         null if the StoreFile is not a reference.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index 13eb5ec..9f3917b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -25,46 +25,29 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Base class for cell sink that separates the provided cells into multiple 
files.
+ * Base class for cell sink that separates the provided cells into multiple 
files for stripe
+ * compaction.
  */
 @InterfaceAudience.Private
-public abstract class StripeMultiFileWriter implements Compactor.CellSink {
-  private static final Log LOG = 
LogFactory.getLog(StripeMultiFileWriter.class);
+public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
 
-  /** Factory that is used to produce single StoreFile.Writer-s */
-  protected WriterFactory writerFactory;
-  protected KVComparator comparator;
+  private static final Log LOG = 
LogFactory.getLog(StripeMultiFileWriter.class);
 
+  protected final KVComparator comparator;
   protected List<StoreFile.Writer> existingWriters;
   protected List<byte[]> boundaries;
-  /** Source scanner that is tracking KV count; may be null if source is not 
StoreScanner */
-  protected StoreScanner sourceScanner;
 
   /** Whether to write stripe metadata */
   private boolean doWriteStripeMetadata = true;
 
-  public interface WriterFactory {
-    public StoreFile.Writer createWriter() throws IOException;
-  }
-
-  /**
-   * Initializes multi-writer before usage.
-   * @param sourceScanner Optional store scanner to obtain the information 
about read progress.
-   * @param factory Factory used to produce individual file writers.
-   * @param comparator Comparator used to compare rows.
-   */
-  public void init(StoreScanner sourceScanner, WriterFactory factory, 
KVComparator comparator)
-      throws IOException {
-    this.writerFactory = factory;
-    this.sourceScanner = sourceScanner;
+  public StripeMultiFileWriter(KVComparator comparator) {
     this.comparator = comparator;
   }
 
@@ -72,41 +55,23 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
     this.doWriteStripeMetadata = false;
   }
 
-  public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws 
IOException {
-    assert this.existingWriters != null;
-    commitWritersInternal();
-    assert this.boundaries.size() == (this.existingWriters.size() + 1);
-    LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
-      + "riting out metadata for " + this.existingWriters.size() + " writers");
-    List<Path> paths = new ArrayList<Path>();
-    for (int i = 0; i < this.existingWriters.size(); ++i) {
-      StoreFile.Writer writer = this.existingWriters.get(i);
-      if (writer == null) continue; // writer was skipped due to 0 KVs
-      if (doWriteStripeMetadata) {
-        writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, 
this.boundaries.get(i));
-        writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, 
this.boundaries.get(i + 1));
-      }
-      writer.appendMetadata(maxSeqId, isMajor);
-      paths.add(writer.getPath());
-      writer.close();
-    }
-    this.existingWriters = null;
-    return paths;
+  @Override
+  protected Collection<Writer> writers() {
+    return existingWriters;
   }
 
-  public List<Path> abortWriters() {
-    assert this.existingWriters != null;
-    List<Path> paths = new ArrayList<Path>();
-    for (StoreFile.Writer writer : this.existingWriters) {
-      try {
-        paths.add(writer.getPath());
-        writer.close();
-      } catch (Exception ex) {
-        LOG.error("Failed to close the writer after an unfinished 
compaction.", ex);
+  @Override
+  protected void preCloseWriter(Writer writer) throws IOException {
+    if (doWriteStripeMetadata) {
+      LOG.debug("Write stripe metadata for " + writer.getPath().toString());
+      int index = existingWriters.indexOf(writer);
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, 
boundaries.get(index));
+      writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, 
boundaries.get(index + 1));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip writing stripe metadata for " + 
writer.getPath().toString());
       }
     }
-    this.existingWriters = null;
-    return paths;
   }
 
   /**
@@ -116,12 +81,12 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
    * @param rowOffset Offset for row.
    * @param rowLength Length for row.
    */
-  protected void sanityCheckLeft(
-      byte[] left, byte[] row, int rowOffset, int rowLength) throws 
IOException {
-    if (StripeStoreFileManager.OPEN_KEY != left &&
-        comparator.compareRows(row, rowOffset, rowLength, left, 0, 
left.length) < 0) {
+  protected void sanityCheckLeft(byte[] left, byte[] row, int rowOffset, int 
rowLength)
+      throws IOException {
+    if (StripeStoreFileManager.OPEN_KEY != left
+        && comparator.compareRows(row, rowOffset, rowLength, left, 0, 
left.length) < 0) {
       String error = "The first row is lower than the left boundary of [" + 
Bytes.toString(left)
-        + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
+          + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
       LOG.error(error);
       throw new IOException(error);
     }
@@ -134,10 +99,10 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
    * @param rowOffset Offset for row.
    * @param rowLength Length for row.
    */
-  protected void sanityCheckRight(
-      byte[] right, byte[] row, int rowOffset, int rowLength) throws 
IOException {
-    if (StripeStoreFileManager.OPEN_KEY != right &&
-        comparator.compareRows(row, rowOffset, rowLength, right, 0, 
right.length) >= 0) {
+  protected void sanityCheckRight(byte[] right, byte[] row, int rowOffset, int 
rowLength)
+      throws IOException {
+    if (StripeStoreFileManager.OPEN_KEY != right
+        && comparator.compareRows(row, rowOffset, rowLength, right, 0, 
right.length) >= 0) {
       String error = "The last row is higher or equal than the right boundary 
of ["
           + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, 
rowLength) + "]";
       LOG.error(error);
@@ -146,15 +111,9 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
   }
 
   /**
-   * Subclasses override this method to be called at the end of a successful 
sequence of
-   * append; all appends are processed before this method is called.
-   */
-  protected abstract void commitWritersInternal() throws IOException;
-
-  /**
-   * MultiWriter that separates the cells based on fixed row-key boundaries.
-   * All the KVs between each pair of neighboring boundaries from the list 
supplied to ctor
-   * will end up in one file, and separate from all other such pairs.
+   * MultiWriter that separates the cells based on fixed row-key boundaries. 
All the KVs between
+   * each pair of neighboring boundaries from the list supplied to ctor will 
end up in one file, and
+   * separate from all other such pairs.
    */
   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
     private StoreFile.Writer currentWriter;
@@ -167,27 +126,26 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
 
     /**
      * @param targetBoundaries The boundaries on which writers/files are 
separated.
-     * @param majorRangeFrom Major range is the range for which at least one 
file should be
-     *                       written (because all files are included in 
compaction).
-     *                       majorRangeFrom is the left boundary.
+     * @param majorRangeFrom Major range is the range for which at least one 
file should be written
+     *          (because all files are included in compaction). majorRangeFrom 
is the left boundary.
      * @param majorRangeTo The right boundary of majorRange (see 
majorRangeFrom).
      */
-    public BoundaryMultiWriter(List<byte[]> targetBoundaries,
+    public BoundaryMultiWriter(KVComparator comparator, List<byte[]> 
targetBoundaries,
         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
-      super();
+      super(comparator);
       this.boundaries = targetBoundaries;
       this.existingWriters = new 
ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
       // "major" range (range for which all files are included) boundaries, if 
any,
       // must match some target boundaries, let's find them.
-      assert  (majorRangeFrom == null) == (majorRangeTo == null);
+      assert (majorRangeFrom == null) == (majorRangeTo == null);
       if (majorRangeFrom != null) {
         majorRangeFromIndex = (majorRangeFrom == 
StripeStoreFileManager.OPEN_KEY) ? 0
-          : Collections.binarySearch(this.boundaries, majorRangeFrom, 
Bytes.BYTES_COMPARATOR);
+            : Collections.binarySearch(this.boundaries, majorRangeFrom, 
Bytes.BYTES_COMPARATOR);
         majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) 
? boundaries.size()
-          : Collections.binarySearch(this.boundaries, majorRangeTo, 
Bytes.BYTES_COMPARATOR);
+            : Collections.binarySearch(this.boundaries, majorRangeTo, 
Bytes.BYTES_COMPARATOR);
         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
-          throw new IOException("Major range does not match writer boundaries: 
[" +
-              Bytes.toString(majorRangeFrom) + "] [" + 
Bytes.toString(majorRangeTo) + "]; from "
+          throw new IOException("Major range does not match writer boundaries: 
["
+              + Bytes.toString(majorRangeFrom) + "] [" + 
Bytes.toString(majorRangeTo) + "]; from "
               + majorRangeFromIndex + " to " + majorRangeToIndex);
         }
       }
@@ -197,8 +155,8 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
     public void append(KeyValue kv) throws IOException {
       if (currentWriter == null && existingWriters.isEmpty()) {
         // First append ever, do a sanity check.
-        sanityCheckLeft(this.boundaries.get(0),
-            kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+        sanityCheckLeft(this.boundaries.get(0), kv.getRowArray(), 
kv.getRowOffset(),
+          kv.getRowLength());
       }
       prepareWriterFor(kv);
       currentWriter.append(kv);
@@ -207,20 +165,20 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
     }
 
     private boolean isKvAfterCurrentWriter(KeyValue kv) {
-      return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
-            (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength(),
-                currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
+      return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY)
+          && (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength(),
+            currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
     }
 
     @Override
-    protected void commitWritersInternal() throws IOException {
+    protected void preCommitWriters() throws IOException {
       stopUsingCurrentWriter();
       while (existingWriters.size() < boundaries.size() - 1) {
         createEmptyWriter();
       }
       if (lastKv != null) {
-        sanityCheckRight(boundaries.get(boundaries.size() - 1),
-            lastKv.getRowArray(), lastKv.getRowOffset(), 
lastKv.getRowLength());
+        sanityCheckRight(boundaries.get(boundaries.size() - 1), 
lastKv.getRowArray(),
+          lastKv.getRowOffset(), lastKv.getRowLength());
       }
     }
 
@@ -240,14 +198,13 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
     }
 
     /**
-     * Called if there are no cells for some stripe.
-     * We need to have something in the writer list for this stripe, so that 
writer-boundary
-     * list indices correspond to each other. We can insert null in the writer 
list for that
-     * purpose, except in the following cases where we actually need a file:
-     * 1) If we are in range for which we are compacting all the files, we 
need to create an
-     * empty file to preserve stripe metadata.
-     * 2) If we have not produced any file at all for this compactions, and 
this is the
-     * last chance (the last stripe), we need to preserve last seqNum (see 
also HBASE-6059).
+     * Called if there are no cells for some stripe. We need to have something 
in the writer list
+     * for this stripe, so that writer-boundary list indices correspond to 
each other. We can insert
+     * null in the writer list for that purpose, except in the following cases 
where we actually
+     * need a file: 1) If we are in range for which we are compacting all the 
files, we need to
+     * create an empty file to preserve stripe metadata. 2) If we have not 
produced any file at all
+     * for this compactions, and this is the last chance (the last stripe), we 
need to preserve last
+     * seqNum (see also HBASE-6059).
      */
     private void createEmptyWriter() throws IOException {
       int index = existingWriters.size();
@@ -257,12 +214,12 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
       boolean needEmptyFile = isInMajorRange || isLastWriter;
       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
       hasAnyWriter |= needEmptyFile;
-      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
-          ? null : boundaries.get(existingWriters.size() + 1);
+      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 
? null
+          : boundaries.get(existingWriters.size() + 1);
     }
 
     private void checkCanCreateWriter() throws IOException {
-      int maxWriterCount =  boundaries.size() - 1;
+      int maxWriterCount = boundaries.size() - 1;
       assert existingWriters.size() <= maxWriterCount;
       if (existingWriters.size() >= maxWriterCount) {
         throw new IOException("Cannot create any more writers (created " + 
existingWriters.size()
@@ -279,16 +236,15 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
         kvsInCurrentWriter = 0;
       }
       currentWriter = null;
-      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
-          ? null : boundaries.get(existingWriters.size() + 1);
+      currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size()) 
? null
+          : boundaries.get(existingWriters.size() + 1);
     }
   }
 
   /**
-   * MultiWriter that separates the cells based on target cell number per file 
and file count.
-   * New file is started every time the target number of KVs is reached, 
unless the fixed
-   * count of writers has already been created (in that case all the remaining 
KVs go into
-   * the last writer).
+   * MultiWriter that separates the cells based on target cell number per file 
and file count. New
+   * file is started every time the target number of KVs is reached, unless 
the fixed count of
+   * writers has already been created (in that case all the remaining KVs go 
into the last writer).
    */
   public static class SizeMultiWriter extends StripeMultiFileWriter {
     private int targetCount;
@@ -309,8 +265,9 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
      * @param left The left boundary of the first writer.
      * @param right The right boundary of the last writer.
      */
-    public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, 
byte[] right) {
-      super();
+    public SizeMultiWriter(KVComparator comparator, int targetCount, long 
targetKvs, byte[] left,
+        byte[] right) {
+      super(comparator);
       this.targetCount = targetCount;
       this.targetKvs = targetKvs;
       this.left = left;
@@ -331,10 +288,10 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
         doCreateWriter = true;
       } else if (lastRowInCurrentWriter != null
           && !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength(),
-              lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
+            lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Stopping to use a writer after [" + 
Bytes.toString(lastRowInCurrentWriter)
-              + "] row; wrote out "  + kvsInCurrentWriter + " kvs");
+              + "] row; wrote out " + kvsInCurrentWriter + " kvs");
         }
         lastRowInCurrentWriter = null;
         kvsInCurrentWriter = 0;
@@ -357,33 +314,32 @@ public abstract class StripeMultiFileWriter implements 
Compactor.CellSink {
       kvsSeen = kvsInCurrentWriter;
       if (this.sourceScanner != null) {
         kvsSeen = Math.max(kvsSeen,
-            this.sourceScanner.getEstimatedNumberOfKvsScanned() - 
kvsSeenInPrevious);
+          this.sourceScanner.getEstimatedNumberOfKvsScanned() - 
kvsSeenInPrevious);
       }
 
       // If we are not already waiting for opportunity to close, start waiting 
if we can
       // create any more writers and if the current one is too big.
-      if (lastRowInCurrentWriter == null
-          && existingWriters.size() < targetCount
+      if (lastRowInCurrentWriter == null && existingWriters.size() < 
targetCount
           && kvsSeen >= targetKvs) {
         lastRowInCurrentWriter = kv.getRow(); // make a copy
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
-              lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs 
and wrote out "
-              + kvsInCurrentWriter + " kvs");
+          LOG.debug("Preparing to start a new writer after ["
+              + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + 
kvsSeen
+              + " kvs and wrote out " + kvsInCurrentWriter + " kvs");
         }
       }
     }
 
     @Override
-    protected void commitWritersInternal() throws IOException {
+    protected void preCommitWriters() throws IOException {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Stopping with "  + kvsInCurrentWriter + " kvs in last 
writer" +
-            ((this.sourceScanner == null) ? "" : ("; observed estimated "
-                + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs 
total")));
+        LOG.debug("Stopping with " + kvsInCurrentWriter + " kvs in last writer"
+            + ((this.sourceScanner == null) ? ""
+                : ("; observed estimated " + 
this.sourceScanner.getEstimatedNumberOfKvsScanned()
+                    + " KVs total")));
       }
       if (lastKv != null) {
-        sanityCheckRight(
-            right, lastKv.getRowArray(), lastKv.getRowOffset(), 
lastKv.getRowLength());
+        sanityCheckRight(right, lastKv.getRowArray(), lastKv.getRowOffset(), 
lastKv.getRowLength());
       }
 
       // When expired stripes were going to be merged into one, and if no 
writer was created during

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index a52b1bf..6e0ab22 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -28,15 +28,14 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
-import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -73,7 +72,8 @@ public class StripeStoreFlusher extends StoreFlusher {
     }
 
     // Let policy select flush method.
-    StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+    StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), 
this.stripes,
+      kvCount);
 
     long flushedBytes = 0;
     boolean success = false;
@@ -82,7 +82,7 @@ public class StripeStoreFlusher extends StoreFlusher {
       mw = req.createWriter(); // Writer according to the policy.
       StripeMultiFileWriter.WriterFactory factory = 
createWriterFactory(tracker, kvCount);
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? 
(StoreScanner)scanner : null;
-      mw.init(storeScanner, factory, store.getComparator());
+      mw.init(storeScanner, factory);
 
       synchronized (flushLock) {
         flushedBytes = performFlush(scanner, mw, smallestReadPoint);
@@ -128,10 +128,17 @@ public class StripeStoreFlusher extends StoreFlusher {
 
   /** Stripe flush request wrapper that writes a non-striped file. */
   public static class StripeFlushRequest {
+
+    protected final KVComparator comparator;
+
+    public StripeFlushRequest(KVComparator comparator) {
+      this.comparator = comparator;
+    }
+
     @VisibleForTesting
     public StripeMultiFileWriter createWriter() throws IOException {
-      StripeMultiFileWriter writer =
-          new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, 
OPEN_KEY, OPEN_KEY);
+      StripeMultiFileWriter writer = new 
StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
+          Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
       writer.setNoStripeMetadata();
       return writer;
     }
@@ -142,13 +149,15 @@ public class StripeStoreFlusher extends StoreFlusher {
     private final List<byte[]> targetBoundaries;
 
     /** @param targetBoundaries New files should be written with these 
boundaries. */
-    public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
+    public BoundaryStripeFlushRequest(KVComparator comparator, List<byte[]> 
targetBoundaries) {
+      super(comparator);
       this.targetBoundaries = targetBoundaries;
     }
 
     @Override
     public StripeMultiFileWriter createWriter() throws IOException {
-      return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, 
null, null);
+      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, 
targetBoundaries, null,
+          null);
     }
   }
 
@@ -162,15 +171,16 @@ public class StripeStoreFlusher extends StoreFlusher {
      * @param targetKvs The KV count of each segment. If targetKvs*targetCount 
is less than
      *                  total number of kvs, all the overflow data goes into 
the last stripe.
      */
-    public SizeStripeFlushRequest(int targetCount, long targetKvs) {
+    public SizeStripeFlushRequest(KVComparator comparator, int targetCount, 
long targetKvs) {
+      super(comparator);
       this.targetCount = targetCount;
       this.targetKvs = targetKvs;
     }
 
     @Override
     public StripeMultiFileWriter createWriter() throws IOException {
-      return new StripeMultiFileWriter.SizeMultiWriter(
-          this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
+      return new StripeMultiFileWriter.SizeMultiWriter(comparator, 
this.targetCount, this.targetKvs,
+          OPEN_KEY, OPEN_KEY);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
new file mode 100644
index 0000000..1381162
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
+import 
org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.security.User;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Base class for implementing a Compactor which will generate multiple output 
files after
+ * compaction.
+ */
[email protected]
+public abstract class AbstractMultiOutputCompactor<T extends 
AbstractMultiFileWriter>
+    extends Compactor {
+
+  private static final Log LOG = 
LogFactory.getLog(AbstractMultiOutputCompactor.class);
+
+  public AbstractMultiOutputCompactor(Configuration conf, Store store) {
+    super(conf, store);
+  }
+
+  protected interface InternalScannerFactory {
+
+    ScanType getScanType(CompactionRequest request);
+
+    InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType 
scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException;
+  }
+
+  protected List<Path> compact(T writer, final CompactionRequest request,
+      InternalScannerFactory scannerFactory, CompactionThroughputController 
throughputController,
+      User user) throws IOException {
+    final FileDetails fd = getFileDetails(request.getFiles(), 
request.isMajor());
+    this.progress = new CompactionProgress(fd.maxKeyCount);
+
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = getSmallestReadPoint();
+
+    List<StoreFileScanner> scanners;
+    Collection<StoreFile> readersToClose;
+    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", 
true)) {
+      // clone all StoreFiles, so we'll do the compaction on a independent 
copy of StoreFiles,
+      // HFiles, and their readers
+      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
+      for (StoreFile f : request.getFiles()) {
+        readersToClose.add(f.cloneForReader());
+      }
+      scanners = createFileScanners(readersToClose, smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    } else {
+      readersToClose = Collections.emptyList();
+      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+        store.throttleCompaction(request.getSize()));
+    }
+    InternalScanner scanner = null;
+    boolean finished = false;
+    try {
+      /* Include deletes, unless we are doing a major compaction */
+      ScanType scanType = scannerFactory.getScanType(request);
+      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, 
scanners);
+      if (scanner == null) {
+        scanner = scannerFactory.createScanner(scanners, scanType, fd, 
smallestReadPoint);
+      }
+      scanner = postCreateCoprocScanner(request, scanType, scanner, user);
+      if (scanner == null) {
+        // NULL scanner returned from coprocessor hooks means skip normal 
processing.
+        return new ArrayList<Path>();
+      }
+      // Create the writer factory for compactions.
+      final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
+      WriterFactory writerFactory = new WriterFactory() {
+        @Override
+        public Writer createWriter() throws IOException {
+          return store.createWriterInTmp(fd.maxKeyCount, 
compactionCompression, true, needMvcc,
+            fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+        }
+      };
+      // Prepare multi-writer, and perform the compaction using scanner and 
writer.
+      // It is ok here if storeScanner is null.
+      StoreScanner storeScanner
+        = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
+      writer.init(storeScanner, writerFactory);
+      finished = performCompaction(scanner, writer, smallestReadPoint, 
throughputController);
+      if (!finished) {
+        throw new InterruptedIOException("Aborting compaction of store " + 
store + " in region "
+            + store.getRegionInfo().getRegionNameAsString() + " because it was 
interrupted.");
+      }
+    } finally {
+      Closeables.close(scanner, true);
+      for (StoreFile f : readersToClose) {
+        try {
+          f.closeReader(true);
+        } catch (IOException e) {
+          LOG.warn("Exception closing " + f, e);
+        }
+      }
+      if (!finished) {
+        FileSystem fs = store.getFileSystem();
+        for (Path leftoverFile : writer.abortWriters()) {
+          try {
+            fs.delete(leftoverFile, false);
+          } catch (IOException e) {
+            LOG.error("Failed to delete the leftover file " + leftoverFile
+                + " after an unfinished compaction.",
+              e);
+          }
+        }
+      }
+    }
+    assert finished : "We should have exited the method on all error paths";
+    return commitMultiWriter(writer, fd, request);
+  }
+
+  protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
+      CompactionRequest request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 1109cb1..dc13ad9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -58,8 +58,8 @@ import org.apache.hadoop.util.StringUtils;
 public abstract class Compactor {
   private static final Log LOG = LogFactory.getLog(Compactor.class);
   protected CompactionProgress progress;
-  protected Configuration conf;
-  protected Store store;
+  protected final Configuration conf;
+  protected final Store store;
 
   private int compactionKVMax;
   protected Compression.Algorithm compactionCompression;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
new file mode 100644
index 0000000..c143bec
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * This compactor will generate StoreFile for different time ranges.
+ */
[email protected]
+public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
+
+  private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class);
+
+  public DateTieredCompactor(Configuration conf, Store store) {
+    super(conf, store);
+  }
+
+  private boolean needEmptyFile(CompactionRequest request) {
+    // if we are going to compact the last N files, then we need to emit an 
empty file to retain the
+    // maxSeqId if we haven't written out anything.
+    return store.getMaxSequenceId() == 
StoreFile.getMaxSequenceIdInList(request.getFiles());
+  }
+
+  public List<Path> compact(final CompactionRequest request, List<Long> 
lowerBoundaries,
+      CompactionThroughputController throughputController, User user) throws 
IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing compaction with " + lowerBoundaries.size()
+          + "windows, lower boundaries: " + lowerBoundaries);
+    }
+
+    DateTieredMultiFileWriter writer = new 
DateTieredMultiFileWriter(lowerBoundaries,
+        needEmptyFile(request));
+    return compact(writer, request, new InternalScannerFactory() {
+
+      @Override
+      public ScanType getScanType(CompactionRequest request) {
+        return request.isMajor() ? ScanType.COMPACT_DROP_DELETES : 
ScanType.COMPACT_RETAIN_DELETES;
+      }
+
+      @Override
+      public InternalScanner createScanner(List<StoreFileScanner> scanners, 
ScanType scanType,
+          FileDetails fd, long smallestReadPoint) throws IOException {
+        return DateTieredCompactor.this.createScanner(store, scanners, 
scanType, smallestReadPoint,
+          fd.earliestPutTs);
+      }
+    }, throughputController, user);
+  }
+
+  @Override
+  protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, 
FileDetails fd,
+      CompactionRequest request) throws IOException {
+    return writer.commitWriters(fd.maxSeqId, request.isMajor());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 68ba419..c4de5e3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -63,10 +63,10 @@ public class DefaultCompactor extends Compactor {
     Collection<StoreFile> readersToClose;
     if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", 
true)) {
       // clone all StoreFiles, so we'll do the compaction on a independent 
copy of StoreFiles,
-      // HFileFiles, and their readers
+      // HFiles, and their readers
       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
       for (StoreFile f : request.getFiles()) {
-        readersToClose.add(new StoreFile(f));
+        readersToClose.add(f.cloneForReader());
       }
       scanners = createFileScanners(readersToClose, smallestReadPoint,
           store.throttleCompaction(request.getSize()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index ebd20dd..54a08ac 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -27,9 +27,10 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
@@ -83,18 +84,20 @@ public class StripeCompactionPolicy extends 
CompactionPolicy {
         request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), 
targetKvsAndCount.getFirst());
   }
 
-  public StripeStoreFlusher.StripeFlushRequest selectFlush(
+  public StripeStoreFlusher.StripeFlushRequest selectFlush(KVComparator 
comparator,
       StripeInformationProvider si, int kvCount) {
     if (this.config.isUsingL0Flush()) {
-      return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, 
return dumb request.
+      // L0 is used, return dumb request.
+      return new StripeStoreFlusher.StripeFlushRequest(comparator);
     }
     if (si.getStripeCount() == 0) {
       // No stripes - start with the requisite count, derive KVs per stripe.
       int initialCount = this.config.getInitialCount();
-      return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, 
kvCount / initialCount);
+      return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, 
initialCount,
+          kvCount / initialCount);
     }
     // There are stripes - do according to the boundaries.
-    return new 
StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
+    return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, 
si.getStripeBoundaries());
   }
 
   public StripeCompactionRequest selectCompaction(StripeInformationProvider si,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 630ff2d..38c8353 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -18,9 +18,6 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -28,35 +25,56 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * This is the placeholder for stripe compactor. The implementation,
- * as well as the proper javadoc, will be added in HBASE-7967.
+ * This is the placeholder for stripe compactor. The implementation, as well 
as the proper javadoc,
+ * will be added in HBASE-7967.
  */
 @InterfaceAudience.Private
-public class StripeCompactor extends Compactor {
+public class StripeCompactor extends 
AbstractMultiOutputCompactor<StripeMultiFileWriter> {
   private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
+
   public StripeCompactor(Configuration conf, Store store) {
     super(conf, store);
   }
 
-  public List<Path> compact(CompactionRequest request, List<byte[]> 
targetBoundaries,
-    byte[] majorRangeFromRow, byte[] majorRangeToRow,
-    CompactionThroughputController throughputController) throws IOException {
-    return compact(request, targetBoundaries, majorRangeFromRow, 
majorRangeToRow,
-      throughputController, null);
+  private final class StripeInternalScannerFactory implements 
InternalScannerFactory {
+
+    private final byte[] majorRangeFromRow;
+
+    private final byte[] majorRangeToRow;
+
+    public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] 
majorRangeToRow) {
+      this.majorRangeFromRow = majorRangeFromRow;
+      this.majorRangeToRow = majorRangeToRow;
+    }
+
+    @Override
+    public ScanType getScanType(CompactionRequest request) {
+      // If majorRangeFromRow and majorRangeToRow are not null, then we will 
not use the return
+      // value to create InternalScanner. See the createScanner method below. 
The return value is
+      // also used when calling coprocessor hooks.
+      return ScanType.COMPACT_RETAIN_DELETES;
+    }
+
+    @Override
+    public InternalScanner createScanner(List<StoreFileScanner> scanners, 
ScanType scanType,
+        FileDetails fd, long smallestReadPoint) throws IOException {
+      return (majorRangeFromRow == null)
+          ? StripeCompactor.this.createScanner(store, scanners, scanType, 
smallestReadPoint,
+            fd.earliestPutTs)
+          : StripeCompactor.this.createScanner(store, scanners, 
smallestReadPoint, fd.earliestPutTs,
+            majorRangeFromRow, majorRangeToRow);
+    }
   }
+
   public List<Path> compact(CompactionRequest request, List<byte[]> 
targetBoundaries,
       byte[] majorRangeFromRow, byte[] majorRangeToRow,
       CompactionThroughputController throughputController, User user) throws 
IOException {
@@ -69,106 +87,31 @@ public class StripeCompactor extends Compactor {
       LOG.debug(sb.toString());
     }
     StripeMultiFileWriter writer = new 
StripeMultiFileWriter.BoundaryMultiWriter(
-        targetBoundaries, majorRangeFromRow, majorRangeToRow);
-    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
-      throughputController, user);
-  }
-
-  public List<Path> compact(CompactionRequest request, int targetCount, long 
targetSize,
-    byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] 
majorRangeToRow,
-    CompactionThroughputController throughputController) throws IOException {
-    return compact(request, targetCount, targetSize, left, right, 
majorRangeFromRow,
-      majorRangeToRow, throughputController, null);
+        store.getComparator(), targetBoundaries, majorRangeFromRow, 
majorRangeToRow);
+    return compact(writer, request,
+      new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), 
throughputController,
+      user);
   }
 
   public List<Path> compact(CompactionRequest request, int targetCount, long 
targetSize,
       byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] 
majorRangeToRow,
       CompactionThroughputController throughputController, User user) throws 
IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing compaction with " + targetSize
-          + " target file size, no more than " + targetCount + " files, in ["
-          + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
+      LOG.debug(
+        "Executing compaction with " + targetSize + " target file size, no 
more than " + targetCount
+            + " files, in [" + Bytes.toString(left) + "] [" + 
Bytes.toString(right) + "] range");
     }
-    StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
+    StripeMultiFileWriter writer = new 
StripeMultiFileWriter.SizeMultiWriter(store.getComparator(),
         targetCount, targetSize, left, right);
-    return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
-      throughputController, user);
+    return compact(writer, request,
+      new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), 
throughputController,
+      user);
   }
 
-  private List<Path> compactInternal(StripeMultiFileWriter mw, final 
CompactionRequest request,
-      byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      CompactionThroughputController throughputController, User user) throws 
IOException {
-    final Collection<StoreFile> filesToCompact = request.getFiles();
-    final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
-    this.progress = new CompactionProgress(fd.maxKeyCount);
-
-    long smallestReadPoint = getSmallestReadPoint();
-    List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
-        smallestReadPoint, store.throttleCompaction(request.getSize()));
-
-    boolean finished = false;
-    InternalScanner scanner = null;
-    try {
-      // Get scanner to use.
-      ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
-      scanner = preCreateCoprocScanner(request, coprocScanType, 
fd.earliestPutTs, scanners, user);
-      if (scanner == null) {
-        scanner = (majorRangeFromRow == null)
-            ? createScanner(store, scanners,
-                ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 
fd.earliestPutTs)
-            : createScanner(store, scanners,
-                smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, 
majorRangeToRow);
-      }
-      scanner = postCreateCoprocScanner(request, coprocScanType, scanner, 
user);
-      if (scanner == null) {
-        // NULL scanner returned from coprocessor hooks means skip normal 
processing.
-        return new ArrayList<Path>();
-      }
-
-      // Create the writer factory for compactions.
-      final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
-      final Compression.Algorithm compression = 
store.getFamily().getCompactionCompression();
-      StripeMultiFileWriter.WriterFactory factory = new 
StripeMultiFileWriter.WriterFactory() {
-        @Override
-        public Writer createWriter() throws IOException {
-          return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 
0,
-              store.throttleCompaction(request.getSize()));
-        }
-      };
-
-      // Prepare multi-writer, and perform the compaction using scanner and 
writer.
-      // It is ok here if storeScanner is null.
-      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? 
(StoreScanner)scanner : null;
-      mw.init(storeScanner, factory, store.getComparator());
-      finished = performCompaction(scanner, mw, smallestReadPoint, 
throughputController);
-      if (!finished) {
-        throw new InterruptedIOException( "Aborting compaction of store " + 
store +
-            " in region " + store.getRegionInfo().getRegionNameAsString() +
-            " because it was interrupted.");
-      }
-    } finally {
-      if (scanner != null) {
-        try {
-          scanner.close();
-        } catch (Throwable t) {
-          // Don't fail the compaction if this fails.
-          LOG.error("Failed to close scanner after compaction.", t);
-        }
-      }
-      if (!finished) {
-        for (Path leftoverFile : mw.abortWriters()) {
-          try {
-            store.getFileSystem().delete(leftoverFile, false);
-          } catch (Exception ex) {
-            LOG.error("Failed to delete the leftover file after an unfinished 
compaction.", ex);
-          }
-        }
-      }
-    }
-
-    assert finished : "We should have exited the method on all error paths";
-    List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
+  @Override
+  protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, 
FileDetails fd,
+      CompactionRequest request) throws IOException {
+    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
     assert !newFiles.isEmpty() : "Should have produced an empty file to 
preserve metadata.";
     return newFiles;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
deleted file mode 100644
index fb123d5..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
-import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
-import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import 
org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-
-@Category(SmallTests.class)
-public class TestStripeCompactor {
-  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
-  private static final TableName TABLE_NAME = 
TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
-
-  private static final byte[] KEY_B = Bytes.toBytes("bbb");
-  private static final byte[] KEY_C = Bytes.toBytes("ccc");
-  private static final byte[] KEY_D = Bytes.toBytes("ddd");
-
-  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
-  private static final KeyValue KV_B = kvAfter(KEY_B);
-  private static final KeyValue KV_C = kvAfter(KEY_C);
-  private static final KeyValue KV_D = kvAfter(KEY_D);
-
-  private static KeyValue kvAfter(byte[] key) {
-    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
-  }
-
-  private static <T> T[] a(T... a) {
-    return a;
-  }
-
-  private static KeyValue[] e() {
-    return TestStripeCompactor.<KeyValue>a();
-  }
-
-  @Test
-  public void testBoundaryCompactions() throws Exception {
-    // General verification
-    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
-        a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, 
KV_C), a(KV_D)));
-    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), 
a(KV_C)));
-    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] 
{ a(KV_B, KV_C) });
-  }
-
-  @Test
-  public void testBoundaryCompactionEmptyFiles() throws Exception {
-    // No empty file if there're already files.
-    verifyBoundaryCompaction(
-        a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), 
null, null, false);
-    verifyBoundaryCompaction(a(KV_A, KV_C),
-        a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, 
null, false);
-    // But should be created if there are no file.
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, 
null, false);
-    // In major range if there's major range.
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, 
KEY_C, false);
-    verifyBoundaryCompaction(
-        e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, 
KEY_C, false);
-    // Major range should have files regardless of KVs.
-    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
-        a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
-    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
-        a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
-
-  }
-
-  public static void verifyBoundaryCompaction(
-      KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws 
Exception {
-    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
-  }
-
-  public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] 
boundaries,
-      KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
-          throws Exception {
-    StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeCompactor sc = createCompactor(writers, input);
-    List<Path> paths =
-        sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, 
majorTo,
-          NoLimitCompactionThroughputController.INSTANCE);
-    writers.verifyKvs(output, allFiles, true);
-    if (allFiles) {
-      assertEquals(output.length, paths.size());
-      writers.verifyBoundaries(boundaries);
-    }
-  }
-
-  @Test
-  public void testSizeCompactions() throws Exception {
-    // General verification with different sizes.
-    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
-        a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
-    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), 
a(KV_C)));
-    // Verify row boundaries are preserved.
-    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
-        a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
-    // Too much data, count limits the number of files.
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
-        a(a(KV_A), a(KV_B, KV_C, KV_D)));
-    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, 
KEY_D,
-        new KeyValue[][] { a(KV_A, KV_B, KV_C) });
-    // Too little data/large count, no extra files.
-    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, 
OPEN_KEY, OPEN_KEY,
-        a(a(KV_A, KV_B), a(KV_C, KV_D)));
-  }
-
-  public static void verifySizeCompaction(KeyValue[] input, int targetCount, 
long targetSize,
-      byte[] left, byte[] right, KeyValue[][] output) throws Exception {
-    StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeCompactor sc = createCompactor(writers, input);
-    List<Path> paths =
-        sc.compact(createDummyRequest(), targetCount, targetSize, left, right, 
null, null,
-          NoLimitCompactionThroughputController.INSTANCE);
-    assertEquals(output.length, paths.size());
-    writers.verifyKvs(output, true, true);
-    List<byte[]> boundaries = new ArrayList<byte[]>();
-    boundaries.add(left);
-    for (int i = 1; i < output.length; ++i) {
-      boundaries.add(output[i][0].getRow());
-    }
-    boundaries.add(right);
-    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
-  }
-
-  private static StripeCompactor createCompactor(
-      StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    final Scanner scanner = new Scanner(input);
-
-    // Create store mock that is satisfactory for compactor.
-    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
-    ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
-    Store store = mock(Store.class);
-    when(store.getFamily()).thenReturn(col);
-    when(store.getScanInfo()).thenReturn(si);
-    when(store.areWritesEnabled()).thenReturn(true);
-    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
-    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
-    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
-        anyBoolean(), anyBoolean(), anyBoolean(), 
anyBoolean())).thenAnswer(writers);
-    when(store.getComparator()).thenReturn(new KVComparator());
-
-    return new StripeCompactor(conf, store) {
-      @Override
-      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
-          long smallestReadPoint, long earliestPutTs, byte[] 
dropDeletesFromRow,
-          byte[] dropDeletesToRow) throws IOException {
-        return scanner;
-      }
-
-      @Override
-      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
-          ScanType scanType, long smallestReadPoint, long earliestPutTs) 
throws IOException {
-        return scanner;
-      }
-    };
-  }
-
-  private static CompactionRequest createDummyRequest() throws Exception {
-    // "Files" are totally unused, it's Scanner class below that gives 
compactor fake KVs.
-    // But compaction depends on everything under the sun, so stub everything 
with dummies.
-    StoreFile sf = mock(StoreFile.class);
-    StoreFile.Reader r = mock(StoreFile.Reader.class);
-    when(r.length()).thenReturn(1L);
-    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
-    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong()))
-      .thenReturn(mock(StoreFileScanner.class));
-    when(sf.getReader()).thenReturn(r);
-    when(sf.createReader()).thenReturn(r);
-    when(sf.createReader(anyBoolean())).thenReturn(r);
-    return new CompactionRequest(Arrays.asList(sf));
-  }
-
-  private static class Scanner implements InternalScanner {
-    private final ArrayList<KeyValue> kvs;
-    public Scanner(KeyValue... kvs) {
-      this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-      if (kvs.isEmpty()) return false;
-      results.add(kvs.remove(0));
-      return !kvs.isEmpty();
-    }
-    @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-      return next(result);
-    }
-    @Override
-    public void close() throws IOException {}
-  }
-
-  // StoreFile.Writer has private ctor and is unwieldy, so this has to be 
convoluted.
-  public static class StoreFileWritersCapture implements
-    Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
-    public static class Writer {
-      public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
-      public TreeMap<byte[], byte[]> data = new TreeMap<byte[], 
byte[]>(Bytes.BYTES_COMPARATOR);
-    }
-
-    private List<Writer> writers = new ArrayList<Writer>();
-
-    @Override
-    public StoreFile.Writer createWriter() throws IOException {
-      final Writer realWriter = new Writer();
-      writers.add(realWriter);
-      StoreFile.Writer writer = mock(StoreFile.Writer.class);
-      doAnswer(new Answer<Object>() {
-        public Object answer(InvocationOnMock invocation) {
-          return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
-        }}).when(writer).append(any(KeyValue.class));
-      doAnswer(new Answer<Object>() {
-        public Object answer(InvocationOnMock invocation) {
-          Object[] args = invocation.getArguments();
-          return realWriter.data.put((byte[])args[0], (byte[])args[1]);
-        }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
-      return writer;
-    }
-
-    @Override
-    public StoreFile.Writer answer(InvocationOnMock invocation) throws 
Throwable {
-      return createWriter();
-    }
-
-    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean 
requireMetadata) {
-      if (allFiles) {
-        assertEquals(kvss.length, writers.size());
-      }
-      int skippedWriters = 0;
-      for (int i = 0; i < kvss.length; ++i) {
-        KeyValue[] kvs = kvss[i];
-        if (kvs != null) {
-          Writer w = writers.get(i - skippedWriters);
-          if (requireMetadata) {
-            assertNotNull(w.data.get(STRIPE_START_KEY));
-            assertNotNull(w.data.get(STRIPE_END_KEY));
-          } else {
-            assertNull(w.data.get(STRIPE_START_KEY));
-            assertNull(w.data.get(STRIPE_END_KEY));
-          }
-          assertEquals(kvs.length, w.kvs.size());
-          for (int j = 0; j < kvs.length; ++j) {
-            assertEquals(kvs[j], w.kvs.get(j));
-          }
-        } else {
-          assertFalse(allFiles);
-          ++skippedWriters;
-        }
-      }
-    }
-
-    public void verifyBoundaries(byte[][] boundaries) {
-      assertEquals(boundaries.length - 1, writers.size());
-      for (int i = 0; i < writers.size(); ++i) {
-        assertArrayEquals("i = " + i, boundaries[i], 
writers.get(i).data.get(STRIPE_START_KEY));
-        assertArrayEquals("i = " + i, boundaries[i + 1], 
writers.get(i).data.get(STRIPE_END_KEY));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
new file mode 100644
index 0000000..5453e07
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestCompactor {
+
+  public static StoreFile createDummyStoreFile(long maxSequenceId) throws 
Exception {
+    // "Files" are totally unused, it's Scanner class below that gives 
compactor fake KVs.
+    // But compaction depends on everything under the sun, so stub everything 
with dummies.
+    StoreFile sf = mock(StoreFile.class);
+    StoreFile.Reader r = mock(StoreFile.Reader.class);
+    when(r.length()).thenReturn(1L);
+    when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
+    when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong()))
+        .thenReturn(mock(StoreFileScanner.class));
+    when(sf.getReader()).thenReturn(r);
+    when(sf.createReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
+    when(sf.cloneForReader()).thenReturn(sf);
+    when(sf.getMaxSequenceId()).thenReturn(maxSequenceId);
+    return sf;
+  }
+
+  public static CompactionRequest createDummyRequest() throws Exception {
+    return new CompactionRequest(Arrays.asList(createDummyStoreFile(1L)));
+  }
+
+  // StoreFile.Writer has private ctor and is unwieldy, so this has to be 
convoluted.
+  public static class StoreFileWritersCapture
+      implements Answer<StoreFile.Writer>, 
AbstractMultiFileWriter.WriterFactory {
+    public static class Writer {
+      public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+      public TreeMap<byte[], byte[]> data = new TreeMap<byte[], 
byte[]>(Bytes.BYTES_COMPARATOR);
+      public boolean hasMetadata;
+    }
+
+    private List<Writer> writers = new ArrayList<Writer>();
+
+    @Override
+    public StoreFile.Writer createWriter() throws IOException {
+      final Writer realWriter = new Writer();
+      writers.add(realWriter);
+      StoreFile.Writer writer = mock(StoreFile.Writer.class);
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          return realWriter.kvs.add((KeyValue) invocation.getArguments()[0]);
+        }
+      }).when(writer).append(any(KeyValue.class));
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          return realWriter.data.put((byte[]) args[0], (byte[]) args[1]);
+        }
+      }).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
+      doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          realWriter.hasMetadata = true;
+          return null;
+        }
+      }).when(writer).appendMetadata(any(Long.class), any(boolean.class));
+      doAnswer(new Answer<Path>() {
+        @Override
+        public Path answer(InvocationOnMock invocation) throws Throwable {
+          return new Path("foo");
+        }
+      }).when(writer).getPath();
+      return writer;
+    }
+
+    @Override
+    public StoreFile.Writer answer(InvocationOnMock invocation) throws 
Throwable {
+      return createWriter();
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean 
requireMetadata) {
+      if (allFiles) {
+        assertEquals(kvss.length, writers.size());
+      }
+      int skippedWriters = 0;
+      for (int i = 0; i < kvss.length; ++i) {
+        KeyValue[] kvs = kvss[i];
+        if (kvs != null) {
+          Writer w = writers.get(i - skippedWriters);
+          if (requireMetadata) {
+            assertNotNull(w.data.get(STRIPE_START_KEY));
+            assertNotNull(w.data.get(STRIPE_END_KEY));
+          } else {
+            assertNull(w.data.get(STRIPE_START_KEY));
+            assertNull(w.data.get(STRIPE_END_KEY));
+          }
+          assertEquals(kvs.length, w.kvs.size());
+          for (int j = 0; j < kvs.length; ++j) {
+            assertEquals(kvs[j], w.kvs.get(j));
+          }
+        } else {
+          assertFalse(allFiles);
+          ++skippedWriters;
+        }
+      }
+    }
+
+    public void verifyBoundaries(byte[][] boundaries) {
+      assertEquals(boundaries.length - 1, writers.size());
+      for (int i = 0; i < writers.size(); ++i) {
+        assertArrayEquals("i = " + i, boundaries[i], 
writers.get(i).data.get(STRIPE_START_KEY));
+        assertArrayEquals("i = " + i, boundaries[i + 1], 
writers.get(i).data.get(STRIPE_END_KEY));
+      }
+    }
+
+    public void verifyKvs(KeyValue[][] kvss, boolean allFiles, List<Long> 
boundaries) {
+      if (allFiles) {
+        assertEquals(kvss.length, writers.size());
+      }
+      int skippedWriters = 0;
+      for (int i = 0; i < kvss.length; ++i) {
+        KeyValue[] kvs = kvss[i];
+        if (kvs != null) {
+          Writer w = writers.get(i - skippedWriters);
+          assertEquals(kvs.length, w.kvs.size());
+          for (int j = 0; j < kvs.length; ++j) {
+            assertTrue(kvs[j].getTimestamp() >= boundaries.get(i));
+            assertTrue(kvs[j].getTimestamp() < boundaries.get(i + 1));
+            assertEquals(kvs[j], w.kvs.get(j));
+          }
+        } else {
+          assertFalse(allFiles);
+          ++skippedWriters;
+        }
+      }
+    }
+
+    public List<Writer> getWriters() {
+      return writers;
+    }
+  }
+
+  public static class Scanner implements InternalScanner {
+    private final ArrayList<KeyValue> kvs;
+
+    public Scanner(KeyValue... kvs) {
+      this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+      if (kvs.isEmpty()) return false;
+      results.add(kvs.remove(0));
+      return !kvs.isEmpty();
+    }
+
+    @Override
+    public boolean next(List<Cell> result, int limit) throws IOException {
+      return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+}

Reply via email to