Repository: hbase
Updated Branches:
  refs/heads/0.98 f39c53057 -> b902c9e4d


http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
new file mode 100644
index 0000000..1497f67
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category(SmallTests.class)
+public class TestDateTieredCompactor {
+
+  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
+
+  private static final TableName TABLE_NAME = 
TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
+
+  private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 
100L);
+
+  private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 
200L);
+
+  private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 
300L);
+
+  private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 
400L);
+
+  @Parameters(name = "{index}: usePrivateReaders={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { true }, new Object[] { false });
+  }
+
+  @Parameter
+  public boolean usePrivateReaders;
+
+  private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
+      final KeyValue[] input, List<StoreFile> storefiles) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
+    final Scanner scanner = new Scanner(input);
+
+    // Create store mock that is satisfactory for compactor.
+    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
+    ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
+    final Store store = mock(Store.class);
+    when(store.getStorefiles()).thenReturn(storefiles);
+    when(store.getFamily()).thenReturn(col);
+    when(store.getScanInfo()).thenReturn(si);
+    when(store.areWritesEnabled()).thenReturn(true);
+    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
+    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
+    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), 
anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.getComparator()).thenReturn(new KVComparator());
+    long maxSequenceId = StoreFile.getMaxSequenceIdInList(storefiles);
+    when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
+
+    return new DateTieredCompactor(conf, store) {
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          long smallestReadPoint, long earliestPutTs, byte[] 
dropDeletesFromRow,
+          byte[] dropDeletesToRow) throws IOException {
+        return scanner;
+      }
+
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          ScanType scanType, long smallestReadPoint, long earliestPutTs) 
throws IOException {
+        return scanner;
+      }
+    };
+  }
+
+  private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] 
output,
+      boolean allFiles) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StoreFile sf1 = createDummyStoreFile(1L);
+    StoreFile sf2 = createDummyStoreFile(2L);
+    DateTieredCompactor dtc = createCompactor(writers, input, 
Arrays.asList(sf1, sf2));
+    List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)),
+      boundaries.subList(0, boundaries.size() - 1), 
NoLimitCompactionThroughputController.INSTANCE,
+      null);
+    writers.verifyKvs(output, allFiles, boundaries);
+    if (allFiles) {
+      assertEquals(output.length, paths.size());
+    }
+  }
+
+  private static <T> T[] a(T... a) {
+    return a;
+  }
+
+  @Test
+  public void test() throws Exception {
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 
500L),
+      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, 
Long.MAX_VALUE),
+      a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
+    verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 
Long.MAX_VALUE),
+      new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
+  }
+
+  @Test
+  public void testEmptyOutputFile() throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    CompactionRequest request = createDummyRequest();
+    DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
+      new ArrayList<StoreFile>(request.getFiles()));
+    List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, 
Long.MAX_VALUE),
+      NoLimitCompactionThroughputController.INSTANCE, null);
+    assertEquals(1, paths.size());
+    List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
+    assertEquals(1, dummyWriters.size());
+    StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
+    assertTrue(dummyWriter.kvs.isEmpty());
+    assertTrue(dummyWriter.hasMetadata);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 9d2e674..debf742 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -63,8 +64,8 @@ import 
org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
-import 
org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
 import 
org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -73,11 +74,16 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.ArgumentMatcher;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+@RunWith(Parameterized.class)
 @Category(SmallTests.class)
 public class TestStripeCompactionPolicy {
   private static final byte[] KEY_A = Bytes.toBytes("aaa");
@@ -97,6 +103,13 @@ public class TestStripeCompactionPolicy {
   private final static int defaultInitialCount = 1;
   private static long defaultTtl = 1000 * 1000;
 
+  @Parameters(name = "{index}: usePrivateReaders={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { true }, new Object[] { false });
+  }
+
+  @Parameter
+  public boolean usePrivateReaders;
   @Test
   public void testNoStripesFromFlush() throws Exception {
     Configuration conf = HBaseConfiguration.create();
@@ -562,9 +575,10 @@ public class TestStripeCompactionPolicy {
   protected void verifyFlush(StripeCompactionPolicy policy, 
StripeInformationProvider si,
       KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws 
IOException {
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
-    StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, 
input.length);
+    StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(new 
KVComparator(), si,
+      input.length);
     StripeMultiFileWriter mw = req.createWriter();
-    mw.init(null, writers, new KeyValue.KVComparator());
+    mw.init(null, writers);
     for (KeyValue kv : input) {
       mw.append(kv);
     }
@@ -726,6 +740,7 @@ public class TestStripeCompactionPolicy {
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
+    when(sf.cloneForReader()).thenReturn(sf);
     return sf;
   }
 
@@ -738,7 +753,7 @@ public class TestStripeCompactionPolicy {
     
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
   }
 
-  private static StripeCompactor createCompactor() throws Exception {
+  private StripeCompactor createCompactor() throws Exception {
     HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
     Store store = mock(Store.class);
@@ -751,6 +766,7 @@ public class TestStripeCompactionPolicy {
         anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
     final Scanner scanner = new Scanner();
     return new StripeCompactor(conf, store) {
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/b902c9e4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
new file mode 100644
index 0000000..fa73448
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
+import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestStripeCompactor {
+  private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
+  private static final TableName TABLE_NAME = 
TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
+
+  private static final byte[] KEY_B = Bytes.toBytes("bbb");
+  private static final byte[] KEY_C = Bytes.toBytes("ccc");
+  private static final byte[] KEY_D = Bytes.toBytes("ddd");
+
+  private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
+  private static final KeyValue KV_B = kvAfter(KEY_B);
+  private static final KeyValue KV_C = kvAfter(KEY_C);
+  private static final KeyValue KV_D = kvAfter(KEY_D);
+
+  private static KeyValue kvAfter(byte[] key) {
+    return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
+  }
+
+  private static <T> T[] a(T... a) {
+    return a;
+  }
+
+  private static KeyValue[] e() {
+    return TestStripeCompactor.<KeyValue> a();
+  }
+
+  @Test
+  public void testBoundaryCompactions() throws Exception {
+    // General verification
+    verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
+      a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, 
KV_C), a(KV_D)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), 
a(KV_C)));
+    verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] 
{ a(KV_B, KV_C) });
+  }
+
+  @Test
+  public void testBoundaryCompactionEmptyFiles() throws Exception {
+    // No empty file if there're already files.
+    verifyBoundaryCompaction(a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), 
a(a(KV_B), null, null),
+      null, null, false);
+    verifyBoundaryCompaction(a(KV_A, KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D),
+      a(a(KV_A), null, a(KV_C)), null, null, false);
+    // But should be created if there are no file.
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, 
null, e()), null,
+      null, false);
+    // In major range if there's major range.
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, 
e(), null), KEY_B,
+      KEY_C, false);
+    verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), 
e(), null), OPEN_KEY,
+      KEY_C, false);
+    // Major range should have files regardless of KVs.
+    verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
+      a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
+    verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, 
OPEN_KEY),
+      a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
+
+  }
+
+  public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] 
boundaries,
+      KeyValue[][] output) throws Exception {
+    verifyBoundaryCompaction(input, boundaries, output, null, null, true);
+  }
+
+  public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] 
boundaries,
+      KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) 
throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths = sc.compact(createDummyRequest(), 
Arrays.asList(boundaries), majorFrom,
+      majorTo, NoLimitCompactionThroughputController.INSTANCE, null);
+    writers.verifyKvs(output, allFiles, true);
+    if (allFiles) {
+      assertEquals(output.length, paths.size());
+      writers.verifyBoundaries(boundaries);
+    }
+  }
+
+  @Test
+  public void testSizeCompactions() throws Exception {
+    // General verification with different sizes.
+    verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
+      a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
+    verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), 
a(KV_C)));
+    // Verify row boundaries are preserved.
+    verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, 
OPEN_KEY,
+      a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
+    // Too much data, count limits the number of files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
+      a(a(KV_A), a(KV_B, KV_C, KV_D)));
+    verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, 
KEY_D,
+      new KeyValue[][] { a(KV_A, KV_B, KV_C) });
+    // Too little data/large count, no extra files.
+    verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, 
OPEN_KEY, OPEN_KEY,
+      a(a(KV_A, KV_B), a(KV_C, KV_D)));
+  }
+
+  public static void verifySizeCompaction(KeyValue[] input, int targetCount, 
long targetSize,
+      byte[] left, byte[] right, KeyValue[][] output) throws Exception {
+    StoreFileWritersCapture writers = new StoreFileWritersCapture();
+    StripeCompactor sc = createCompactor(writers, input);
+    List<Path> paths = sc.compact(createDummyRequest(), targetCount, 
targetSize, left, right, null,
+      null, NoLimitCompactionThroughputController.INSTANCE, null);
+    assertEquals(output.length, paths.size());
+    writers.verifyKvs(output, true, true);
+    List<byte[]> boundaries = new ArrayList<byte[]>();
+    boundaries.add(left);
+    for (int i = 1; i < output.length; ++i) {
+      boundaries.add(output[i][0].getRow());
+    }
+    boundaries.add(right);
+    writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
+  }
+
+  private static StripeCompactor createCompactor(StoreFileWritersCapture 
writers, KeyValue[] input)
+      throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.regionserver.compaction.private.readers", false);
+    final Scanner scanner = new Scanner(input);
+
+    // Create store mock that is satisfactory for compactor.
+    HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
+    ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
+    Store store = mock(Store.class);
+    when(store.getFamily()).thenReturn(col);
+    when(store.getScanInfo()).thenReturn(si);
+    when(store.areWritesEnabled()).thenReturn(true);
+    when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
+    when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
+    when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), 
anyBoolean(),
+      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+    when(store.getComparator()).thenReturn(new KVComparator());
+
+    return new StripeCompactor(conf, store) {
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          long smallestReadPoint, long earliestPutTs, byte[] 
dropDeletesFromRow,
+          byte[] dropDeletesToRow) throws IOException {
+        return scanner;
+      }
+
+      @Override
+      protected InternalScanner createScanner(Store store, 
List<StoreFileScanner> scanners,
+          ScanType scanType, long smallestReadPoint, long earliestPutTs) 
throws IOException {
+        return scanner;
+      }
+    };
+  }
+}

Reply via email to