Repository: hbase
Updated Branches:
  refs/heads/branch-1 3e3276d7f -> 6e5e5d8cc


HBASE-12562 Handling memory pressure for secondary region replicas

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e5e5d8c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e5e5d8c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e5e5d8c

Branch: refs/heads/branch-1
Commit: 6e5e5d8ccef17c8d38f3284204bebd20f92525bd
Parents: 3e3276d
Author: Enis Soztutar <e...@apache.org>
Authored: Fri Mar 6 14:32:05 2015 -0800
Committer: Enis Soztutar <e...@apache.org>
Committed: Fri Mar 6 14:48:58 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/io/FileLink.java    |   3 +
 .../hadoop/hbase/regionserver/HRegion.java      | 156 +++++++++++++++----
 .../hbase/regionserver/MemStoreFlusher.java     |  85 ++++++++--
 .../hbase/util/ServerRegionReplicaUtil.java     |  29 ++++
 .../regionserver/TestHRegionReplayEvents.java   |  99 ++++++++++++
 5 files changed, 333 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5e5d8c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index 67153ae..1c5a593 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -476,6 +476,9 @@ public class FileLink {
 
   @Override
   public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
     // Assumes that the ordering of locations between objects are the same. 
This is true for the
     // current subclasses already (HFileLink, WALLink). Otherwise, we may have 
to sort the locations
     // or keep them presorted

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5e5d8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index af43382..14df928 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4470,7 +4470,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
    * @param flush the flush descriptor
    * @throws IOException
    */
-  private void dropMemstoreContentsForSeqId(long seqId, Store store) throws 
IOException {
+  private long dropMemstoreContentsForSeqId(long seqId, Store store) throws 
IOException {
+    long totalFreedSize = 0;
     this.updatesLock.writeLock().lock();
     try {
       mvcc.waitForPreviousTransactionsComplete();
@@ -4484,10 +4485,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
         if (store == null ) {
           for (Store s : stores.values()) {
-            dropStoreMemstoreContentsForSeqId(s, currentSeqId);
+            totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, 
currentSeqId);
           }
         } else {
-          dropStoreMemstoreContentsForSeqId(store, currentSeqId);
+          totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, 
currentSeqId);
         }
       } else {
         LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4497,13 +4498,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
     } finally {
       this.updatesLock.writeLock().unlock();
     }
+    return totalFreedSize;
   }
 
-  private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) 
throws IOException {
-    this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
+  private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) 
throws IOException {
+    long snapshotSize = s.getFlushableSize();
+    this.addAndGetGlobalMemstoreSize(-snapshotSize);
     StoreFlushContext ctx = s.createFlushContext(currentSeqId);
     ctx.prepare();
     ctx.abort();
+    return snapshotSize;
   }
 
   private void replayWALFlushAbortMarker(FlushDescriptor flush) {
@@ -4624,27 +4628,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
 
         // if all stores ended up dropping their snapshots, we can safely drop 
the
         // prepareFlushResult
-        if (writestate.flushing) {
-          boolean canDrop = true;
-          for (Entry<byte[], StoreFlushContext> entry
-              : prepareFlushResult.storeFlushCtxs.entrySet()) {
-            Store store = getStore(entry.getKey());
-            if (store == null) {
-              continue;
-            }
-            if (store.getSnapshotSize() > 0) {
-              canDrop = false;
-            }
-          }
-
-          // this means that all the stores in the region has finished 
flushing, but the WAL marker
-          // may not have been written or we did not receive it yet.
-          if (canDrop) {
-            writestate.flushing = false;
-            this.prepareFlushResult = null;
-          }
-        }
-
+        dropPrepareFlushIfPossible();
 
         // advance the mvcc read point so that the new flushed file is visible.
         // there may be some in-flight transactions, but they won't be made 
visible since they are
@@ -4746,6 +4730,126 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
     }
   }
 
+  /**
+   * If all stores ended up dropping their snapshots, we can safely drop the 
prepareFlushResult
+   */
+  private void dropPrepareFlushIfPossible() {
+    if (writestate.flushing) {
+      boolean canDrop = true;
+      if (prepareFlushResult.storeFlushCtxs != null) {
+        for (Entry<byte[], StoreFlushContext> entry
+            : prepareFlushResult.storeFlushCtxs.entrySet()) {
+          Store store = getStore(entry.getKey());
+          if (store == null) {
+            continue;
+          }
+          if (store.getSnapshotSize() > 0) {
+            canDrop = false;
+            break;
+          }
+        }
+      }
+
+      // this means that all the stores in the region has finished flushing, 
but the WAL marker
+      // may not have been written or we did not receive it yet.
+      if (canDrop) {
+        writestate.flushing = false;
+        this.prepareFlushResult = null;
+      }
+    }
+  }
+
+  /**
+   * Checks the underlying store files, and opens the files that  have not
+   * been opened, and removes the store file readers for store files no longer
+   * available. Mainly used by secondary region replicas to keep up to date 
with
+   * the primary region files or open new flushed files and drop their 
memstore snapshots in case
+   * of memory pressure.
+   * @throws IOException
+   */
+  boolean refreshStoreFiles() throws IOException {
+    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+      return false; // if primary nothing to do
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getRegionInfo().getEncodedName() + " : "
+          + "Refreshing store files to see whether we can free up memstore");
+    }
+
+    long totalFreedSize = 0;
+
+    long smallestSeqIdInStores = Long.MAX_VALUE;
+
+    startRegionOperation(); // obtain region close lock
+    try {
+      synchronized (writestate) {
+        for (Store store : getStores().values()) {
+          // TODO: some stores might see new data from flush, while others do 
not which
+          // MIGHT break atomic edits across column families.
+          long maxSeqIdBefore = store.getMaxSequenceId();
+
+          // refresh the store files. This is similar to observing a region 
open wal marker.
+          store.refreshStoreFiles();
+
+          long storeSeqId = store.getMaxSequenceId();
+          if (storeSeqId < smallestSeqIdInStores) {
+            smallestSeqIdInStores = storeSeqId;
+          }
+
+          // see whether we can drop the memstore or the snapshot
+          if (storeSeqId > maxSeqIdBefore) {
+
+            if (writestate.flushing) {
+              // only drop memstore snapshots if they are smaller than last 
flush for the store
+              if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
+                StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs 
== null ?
+                    null : 
this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
+                if (ctx != null) {
+                  long snapshotSize = store.getFlushableSize();
+                  ctx.abort();
+                  this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                  
this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
+                  totalFreedSize += snapshotSize;
+                }
+              }
+            }
+
+            // Drop the memstore contents if they are now smaller than the 
latest seen flushed file
+            totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
+          }
+        }
+
+        // if all stores ended up dropping their snapshots, we can safely drop 
the
+        // prepareFlushResult
+        dropPrepareFlushIfPossible();
+
+        // advance the mvcc read point so that the new flushed files are 
visible.
+        // there may be some in-flight transactions, but they won't be made 
visible since they are
+        // either greater than flush seq number or they were already picked up 
via flush.
+        for (Store s : getStores().values()) {
+          getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
+        }
+
+        // smallestSeqIdInStores is the seqId that we have a corresponding 
hfile for. We can safely
+        // skip all edits that are to be replayed in the future with that has 
a smaller seqId
+        // than this. We are updating lastReplayedOpenRegionSeqId so that we 
can skip all edits
+        // that we have picked the flush files for
+        if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
+          this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
+        }
+      }
+      // C. Finally notify anyone waiting on memstore to clear:
+      // e.g. checkResources().
+      synchronized (this) {
+        notifyAll(); // FindBugs NN_NAKED_NOTIFY
+      }
+      return totalFreedSize > 0;
+    } finally {
+      closeRegionOperation();
+    }
+  }
+
   /** Checks whether the given regionName is either equal to our region, or 
that
    * the regionName is the primary region to our corresponding range for the 
secondary replica.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5e5d8c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 88c62bb..fb90881 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
@@ -45,11 +46,15 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -70,6 +75,7 @@ import com.google.common.base.Preconditions;
 class MemStoreFlusher implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
 
+  private Configuration conf;
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
   private final BlockingQueue<FlushQueueEntry> flushQueue =
@@ -100,6 +106,7 @@ class MemStoreFlusher implements FlushRequester {
   public MemStoreFlusher(final Configuration conf,
       final HRegionServer server) {
     super();
+    this.conf = conf;
     this.server = server;
     this.threadWakeFrequency =
       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -138,6 +145,9 @@ class MemStoreFlusher implements FlushRequester {
 
     Set<HRegion> excludedRegions = new HashSet<HRegion>();
 
+    double secondaryMultiplier
+      = 
ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
+
     boolean flushedOne = false;
     while (!flushedOne) {
       // Find the biggest region that doesn't have too many storefiles
@@ -147,8 +157,11 @@ class MemStoreFlusher implements FlushRequester {
       // Find the biggest region, total, even if it might have too many 
flushes.
       HRegion bestAnyRegion = getBiggestMemstoreRegion(
           regionsBySize, excludedRegions, false);
+      // Find the biggest region that is a secondary region
+      HRegion bestRegionReplica = 
getBiggestMemstoreOfRegionReplica(regionsBySize,
+        excludedRegions);
 
-      if (bestAnyRegion == null) {
+      if (bestAnyRegion == null && bestRegionReplica == null) {
         LOG.error("Above memory mark but there are no flushable regions!");
         return false;
       }
@@ -177,18 +190,37 @@ class MemStoreFlusher implements FlushRequester {
         }
       }
 
-      Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
-
-      LOG.info("Flush of region " + regionToFlush + " due to global heap 
pressure. "
-          + "Total Memstore size="
-          + 
humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
-          + ", Region memstore size="
-          + humanReadableInt(regionToFlush.memstoreSize.get()));
-      flushedOne = flushRegion(regionToFlush, true, true);
-      if (!flushedOne) {
-        LOG.info("Excluding unflushable region " + regionToFlush +
-          " - trying to find a different region to flush.");
-        excludedRegions.add(regionToFlush);
+      Preconditions.checkState(
+        (regionToFlush != null && regionToFlush.memstoreSize.get() > 0) ||
+        (bestRegionReplica != null && bestRegionReplica.memstoreSize.get() > 
0));
+
+      if (regionToFlush == null ||
+          (bestRegionReplica != null &&
+           
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
+           (bestRegionReplica.memstoreSize.get()
+               > secondaryMultiplier * regionToFlush.memstoreSize.get()))) {
+        LOG.info("Refreshing storefiles of region " + regionToFlush +
+          " due to global heap pressure. memstore size=" + 
StringUtils.humanReadableInt(
+            server.getRegionServerAccounting().getGlobalMemstoreSize()));
+        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
+        if (!flushedOne) {
+          LOG.info("Excluding secondary region " + regionToFlush +
+              " - trying to find a different region to refresh files.");
+          excludedRegions.add(bestRegionReplica);
+        }
+      } else {
+        LOG.info("Flush of region " + regionToFlush + " due to global heap 
pressure. "
+            + "Total Memstore size="
+            + 
humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+            + ", Region memstore size="
+            + humanReadableInt(regionToFlush.memstoreSize.get()));
+        flushedOne = flushRegion(regionToFlush, true, true);
+
+        if (!flushedOne) {
+          LOG.info("Excluding unflushable region " + regionToFlush +
+              " - trying to find a different region to flush.");
+          excludedRegions.add(regionToFlush);
+        }
       }
     }
     return true;
@@ -281,6 +313,33 @@ class MemStoreFlusher implements FlushRequester {
     return null;
   }
 
+  private HRegion getBiggestMemstoreOfRegionReplica(SortedMap<Long, HRegion> 
regionsBySize,
+      Set<HRegion> excludedRegions) {
+    synchronized (regionsInQueue) {
+      for (HRegion region : regionsBySize.values()) {
+        if (excludedRegions.contains(region)) {
+          continue;
+        }
+
+        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
+          continue;
+        }
+
+        return region;
+      }
+    }
+    return null;
+  }
+
+  private boolean refreshStoreFilesAndReclaimMemory(HRegion region) {
+    try {
+      return region.refreshStoreFiles();
+    } catch (IOException e) {
+      LOG.warn("Refreshing store files failed with exception", e);
+    }
+    return false;
+  }
+
   /**
    * Return true if global memory usage is above the high watermark
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5e5d8c/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 710698b..4551722 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -69,6 +69,25 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
   private static final boolean DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH = 
true;
 
   /**
+   * Enables or disables refreshing store files of secondary region replicas 
when the memory is
+   * above the global memstore lower limit. Refreshing the store files means 
that we will do a file
+   * list of the primary regions store files, and pick up new files. Also 
depending on the store
+   * files, we can drop some memstore contents which will free up memory.
+   */
+  public static final String REGION_REPLICA_STORE_FILE_REFRESH
+    = "hbase.region.replica.storefile.refresh";
+  private static final boolean DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH = 
true;
+
+  /**
+   * The multiplier to use when we want to refresh a secondary region instead 
of flushing a primary
+   * region. Default value assumes that for doing the file refresh, the 
biggest secondary should be
+   * 4 times bigger than the biggest primary.
+   */
+  public static final String 
REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER
+    = "hbase.region.replica.storefile.refresh.memstore.multiplier";
+  private static final double 
DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER = 4;
+
+  /**
    * Returns the regionInfo object to use for interacting with the file system.
    * @return An HRegionInfo object to interact with the filesystem
    */
@@ -163,6 +182,16 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
       DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
   }
 
+  public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration 
conf) {
+    return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH,
+      DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH);
+  }
+
+  public static double 
getRegionReplicaStoreFileRefreshMultiplier(Configuration conf) {
+    return 
conf.getDouble(REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER,
+      DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER);
+  }
+
   /**
    * Return the peer id used for replicating to secondary region replicas
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5e5d8c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 14d898f..35ed253 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -1326,6 +1327,104 @@ public class TestHRegionReplayEvents {
     secondaryRegion.get(new Get(Bytes.toBytes(0)));
   }
 
+  @Test
+  public void testRefreshStoreFiles() throws IOException {
+    assertEquals(0, primaryRegion.getStoreFileList(families).size());
+    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
+
+    // Test case 1: refresh with an empty region
+    secondaryRegion.refreshStoreFiles();
+    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
+
+    // do one flush
+    putDataWithFlushes(primaryRegion, 100, 100, 0);
+    int numRows = 100;
+
+    // refresh the store file list, and ensure that the files are picked up.
+    secondaryRegion.refreshStoreFiles();
+    assertPathListsEqual(primaryRegion.getStoreFileList(families),
+      secondaryRegion.getStoreFileList(families));
+    assertEquals(families.length, 
secondaryRegion.getStoreFileList(families).size());
+
+    LOG.info("-- Verifying edits from secondary");
+    verifyData(secondaryRegion, 0, numRows, cq, families);
+
+    // Test case 2: 3 some more flushes
+    putDataWithFlushes(primaryRegion, 100, 300, 0);
+    numRows = 300;
+
+    // refresh the store file list, and ensure that the files are picked up.
+    secondaryRegion.refreshStoreFiles();
+    assertPathListsEqual(primaryRegion.getStoreFileList(families),
+      secondaryRegion.getStoreFileList(families));
+    assertEquals(families.length * 4, 
secondaryRegion.getStoreFileList(families).size());
+
+    LOG.info("-- Verifying edits from secondary");
+    verifyData(secondaryRegion, 0, numRows, cq, families);
+
+    if (FSUtils.WINDOWS) {
+      // compaction cannot move files while they are open in secondary on 
windows. Skip remaining.
+      return;
+    }
+
+    // Test case 3: compact primary files
+    primaryRegion.compactStores();
+    secondaryRegion.refreshStoreFiles();
+    assertPathListsEqual(primaryRegion.getStoreFileList(families),
+      secondaryRegion.getStoreFileList(families));
+    assertEquals(families.length, 
secondaryRegion.getStoreFileList(families).size());
+
+    LOG.info("-- Verifying edits from secondary");
+    verifyData(secondaryRegion, 0, numRows, cq, families);
+
+    LOG.info("-- Replaying edits in secondary");
+
+    // Test case 4: replay some edits, ensure that memstore is dropped.
+    assertTrue(secondaryRegion.getMemstoreSize().get() == 0);
+    putDataWithFlushes(primaryRegion, 400, 400, 0);
+    numRows = 400;
+
+    reader =  createWALReaderForPrimary();
+    while (true) {
+      WAL.Entry entry = reader.next();
+      if (entry == null) {
+        break;
+      }
+      FlushDescriptor flush = 
WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
+      if (flush != null) {
+        // do not replay flush
+      } else {
+        replayEdit(secondaryRegion, entry);
+      }
+    }
+
+    assertTrue(secondaryRegion.getMemstoreSize().get() > 0);
+
+    secondaryRegion.refreshStoreFiles();
+
+    assertTrue(secondaryRegion.getMemstoreSize().get() == 0);
+
+    LOG.info("-- Verifying edits from primary");
+    verifyData(primaryRegion, 0, numRows, cq, families);
+    LOG.info("-- Verifying edits from secondary");
+    verifyData(secondaryRegion, 0, numRows, cq, families);
+  }
+
+  /**
+   * Paths can be qualified or not. This does the assertion using String->Path 
conversion.
+   */
+  private void assertPathListsEqual(List<String> list1, List<String> list2) {
+    List<Path> l1 = new ArrayList<>(list1.size());
+    for (String path : list1) {
+      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
+    }
+    List<Path> l2 = new ArrayList<>(list2.size());
+    for (String path : list2) {
+      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
+    }
+    assertEquals(l1, l2);
+  }
+
   private void disableReads(HRegion region) {
     region.setReadsEnabled(false);
     try {

Reply via email to