Repository: hbase
Updated Branches:
  refs/heads/branch-1 063247bf3 -> 4fa0e3274


http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto 
b/hbase-protocol/src/main/protobuf/WAL.proto
index 88e94f4..f14d5f4 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -89,13 +89,36 @@ message CompactionDescriptor {
   required bytes table_name = 1; // TODO: WALKey already stores these, might 
remove
   required bytes encoded_region_name = 2;
   required bytes family_name = 3;
-  repeated string compaction_input = 4;
+  repeated string compaction_input = 4; // relative to store dir
   repeated string compaction_output = 5;
-  required string store_home_dir = 6;
+  required string store_home_dir = 6; // relative to region dir
   optional bytes  region_name = 7; // full region name
 }
 
 /**
+ * Special WAL entry to hold all related to a flush.
+ */
+message FlushDescriptor {
+  enum FlushAction {
+    START_FLUSH = 0;
+    COMMIT_FLUSH = 1;
+    ABORT_FLUSH = 2;
+  }
+
+  message StoreFlushDescriptor {
+    required bytes family_name = 1;
+    required string store_home_dir = 2; //relative to region dir
+    repeated string flush_output = 3; // relative to store dir (if this is a 
COMMIT_FLUSH)
+  }
+
+  required FlushAction action = 1;
+  required bytes table_name = 2;
+  required bytes encoded_region_name = 3;
+  optional uint64 flush_sequence_number = 4;
+  repeated StoreFlushDescriptor store_flushes = 5;
+}
+
+/**
  * A trailer that is appended to the end of a properly closed HLog WAL file.
  * If missing, this is either a legacy or a corrupted WAL file.
  */

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6bce6a8..a0ac336 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -113,10 +114,13 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
 import 
org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -1729,8 +1733,11 @@ public class HRegion implements HeapSize { // , Writable{
     status.setStatus("Preparing to flush by snapshotting stores in " +
       getRegionInfo().getEncodedName());
     List<StoreFlushContext> storeFlushCtxs = new 
ArrayList<StoreFlushContext>(stores.size());
+    TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], 
List<Path>>(
+        Bytes.BYTES_COMPARATOR);
     long flushSeqId = -1L;
 
+    long trxId = 0;
     try {
       try {
         w = mvcc.beginMemstoreInsert();
@@ -1754,12 +1761,39 @@ public class HRegion implements HeapSize { // , 
Writable{
         for (Store s : stores.values()) {
           totalFlushableSize += s.getFlushableSize();
           storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+          committedFiles.put(s.getFamily().getName(), null); // for writing 
stores to WAL
+        }
+
+        // write the snapshot start to WAL
+        if (wal != null) {
+          FlushDescriptor desc = 
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
+            getRegionInfo(), flushSeqId, committedFiles);
+          trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, 
getRegionInfo(),
+            desc, sequenceId, false); // no sync. Sync is below where we do 
not hold the updates lock
         }
 
         // Prepare flush (take a snapshot)
         for (StoreFlushContext flush : storeFlushCtxs) {
           flush.prepare();
         }
+      } catch (IOException ex) {
+        if (wal != null) {
+          if (trxId > 0) { // check whether we have already written 
START_FLUSH to WAL
+            try {
+              FlushDescriptor desc = 
ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
+                getRegionInfo(), flushSeqId, committedFiles);
+              HLogUtil.writeFlushMarker(wal, this.htableDescriptor, 
getRegionInfo(),
+                desc, sequenceId, false);
+            } catch (Throwable t) {
+              LOG.warn("Received unexpected exception trying to write 
ABORT_FLUSH marker to WAL:" +
+                  StringUtils.stringifyException(t));
+              // ignore this since we will be aborting the RS with DSE.
+            }
+          }
+          // we have called wal.startCacheFlush(), now we have to abort it
+          wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+          throw ex; // let upper layers deal with it.
+        }
       } finally {
         this.updatesLock.writeLock().unlock();
       }
@@ -1767,9 +1801,16 @@ public class HRegion implements HeapSize { // , Writable{
         ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
       status.setStatus(s);
       if (LOG.isTraceEnabled()) LOG.trace(s);
-      // sync unflushed WAL changes when deferred log sync is enabled
+      // sync unflushed WAL changes
       // see HBASE-8208 for details
-      if (wal != null && !shouldSyncLog()) wal.sync();
+      if (wal != null) {
+        try {
+          wal.sync(); // ensure that flush marker is sync'ed
+        } catch (IOException ioe) {
+          LOG.warn("Unexpected exception while log.sync(), ignoring. 
Exception: "
+              + StringUtils.stringifyException(ioe));
+        }
+      }
 
       // wait for all in-progress transactions to commit to HLog before
       // we can start the flush. This prevents
@@ -1806,16 +1847,27 @@ public class HRegion implements HeapSize { // , 
Writable{
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
+      Iterator<Store> it = stores.values().iterator(); // stores.values() and 
storeFlushCtxs have
+      // same order
       for (StoreFlushContext flush : storeFlushCtxs) {
         boolean needsCompaction = flush.commit(status);
         if (needsCompaction) {
           compactionRequested = true;
         }
+        committedFiles.put(it.next().getFamily().getName(), 
flush.getCommittedFiles());
       }
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
+
+      if (wal != null) {
+        // write flush marker to WAL. If fail, we should throw 
DroppedSnapshotException
+        FlushDescriptor desc = 
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
+          getRegionInfo(), flushSeqId, committedFiles);
+        HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+          desc, sequenceId, true);
+      }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
       // The hlog needs to be replayed so its content is restored to memstore.
@@ -1824,6 +1876,16 @@ public class HRegion implements HeapSize { // , Writable{
       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
       // all and sundry.
       if (wal != null) {
+        try {
+          FlushDescriptor desc = 
ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
+            getRegionInfo(), flushSeqId, committedFiles);
+          HLogUtil.writeFlushMarker(wal, this.htableDescriptor, 
getRegionInfo(),
+            desc, sequenceId, false);
+        } catch (Throwable ex) {
+          LOG.warn("Received unexpected exception trying to write ABORT_FLUSH 
marker to WAL:" +
+              StringUtils.stringifyException(ex));
+          // ignore this since we will be aborting the RS with DSE.
+        }
         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
       }
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index e059fe8..3f5729a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2035,6 +2035,7 @@ public class HStore implements Store {
     private long cacheFlushSeqNum;
     private MemStoreSnapshot snapshot;
     private List<Path> tempFiles;
+    private List<Path> committedFiles;
 
     private StoreFlusherImpl(long cacheFlushSeqNum) {
       this.cacheFlushSeqNum = cacheFlushSeqNum;
@@ -2047,6 +2048,7 @@ public class HStore implements Store {
     @Override
     public void prepare() {
       this.snapshot = memstore.snapshot();
+      committedFiles = new ArrayList<Path>(1);
     }
 
     @Override
@@ -2079,14 +2081,20 @@ public class HStore implements Store {
         }
       }
 
-      if (HStore.this.getCoprocessorHost() != null) {
-        for (StoreFile sf : storeFiles) {
+      for (StoreFile sf : storeFiles) {
+        if (HStore.this.getCoprocessorHost() != null) {
           HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
         }
+        committedFiles.add(sf.getPath());
       }
       // Add new file to store files.  Clear snapshot too while we have the 
Store write lock.
       return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
     }
+
+    @Override
+    public List<Path> getCommittedFiles() {
+      return committedFiles;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
index 193a811..fdf1f1e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 
 /**
@@ -61,4 +63,10 @@ interface StoreFlushContext {
    * @throws IOException
    */
   boolean commit(MonitoredTask status) throws IOException;
+
+  /**
+   * Returns the newly committed files from the flush. Called only if commit 
returns true
+   * @return a list of Paths for new files
+   */
+  List<Path> getCommittedFiles();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index a0707f7..2c4652b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 import com.google.protobuf.TextFormat;
@@ -268,4 +269,19 @@ public class HLogUtil {
       LOG.trace("Appended compaction marker " + 
TextFormat.shortDebugString(c));
     }
   }
+
+  /**
+   * Write a flush marker indicating a start / abort or a complete of a region 
flush
+   */
+  public static long writeFlushMarker(HLog log, HTableDescriptor htd, 
HRegionInfo info,
+      final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws 
IOException {
+    TableName tn = TableName.valueOf(f.getTableName().toByteArray());
+    HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+    long trx = log.appendNoSync(htd, info, key, 
WALEdit.createFlushWALEdit(info, f), sequenceId, false, null);
+    if (sync) log.sync(trx);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
+    }
+    return trx;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 24d9d6d..f684d7d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.HeapSize;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.io.Writable;
 
 
@@ -83,6 +85,8 @@ public class WALEdit implements Writable, HeapSize {
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
   static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
+  static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
+
   private final int VERSION_2 = -1;
   private final boolean isReplay;
 
@@ -112,6 +116,10 @@ public class WALEdit implements Writable, HeapSize {
     return Bytes.equals(METAFAMILY, f);
   }
 
+  public static boolean isMetaEditFamily(Cell cell) {
+    return CellUtil.matchingFamily(cell, METAFAMILY);
+  }
+
   /**
    * @return True when current WALEdit is created by log replay. Replication 
skips WALEdits from
    *         replay.
@@ -256,6 +264,19 @@ public class WALEdit implements Writable, HeapSize {
     return sb.toString();
   }
 
+  public static WALEdit createFlushWALEdit(HRegionInfo hri, FlushDescriptor f) 
{
+    KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH,
+      EnvironmentEdgeManager.currentTimeMillis(), f.toByteArray());
+    return new WALEdit().add(kv);
+  }
+
+  public static FlushDescriptor getFlushDescriptor(Cell cell) throws 
IOException {
+    if (CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)) {
+      return FlushDescriptor.parseFrom(cell.getValue());
+    }
+    return null;
+  }
+
   /**
    * Create a compacion WALEdit
    * @param c
@@ -264,7 +285,7 @@ public class WALEdit implements Writable, HeapSize {
   public static WALEdit createCompaction(final HRegionInfo hri, final 
CompactionDescriptor c) {
     byte [] pbbytes = c.toByteArray();
     KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
-      System.currentTimeMillis(), pbbytes);
+      EnvironmentEdgeManager.currentTimeMillis(), pbbytes);
     return new WALEdit().add(kv); //replication scope null so that this won't 
be replicated
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4fa0e327/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 15e530a..6e050a0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -35,10 +35,12 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -111,6 +113,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
@@ -136,6 +141,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
@@ -787,6 +793,228 @@ public class TestHRegion {
   }
 
   @Test
+  public void testFlushMarkers() throws Exception {
+    // tests that flush markers are written to WAL and handled at recovered 
edits
+    String method = name.getMethodName();
+    TableName tableName = TableName.valueOf(method);
+    byte[] family = Bytes.toBytes("family");
+    Path logDir = 
TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log");
+    HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, 
UUID.randomUUID().toString(),
+      TEST_UTIL.getConfiguration());
+
+    this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
+      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, 
hlog, family);
+    try {
+      Path regiondir = region.getRegionFileSystem().getRegionDir();
+      FileSystem fs = region.getRegionFileSystem().getFileSystem();
+      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+      long maxSeqId = 3;
+      long minSeqId = 0;
+
+      for (long i = minSeqId; i < maxSeqId; i++) {
+        Put put = new Put(Bytes.toBytes(i));
+        put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
+        region.put(put);
+        region.flushcache();
+      }
+
+      // this will create a region with 3 files from flush
+      assertEquals(3, region.getStore(family).getStorefilesCount());
+      List<String> storeFiles = new ArrayList<String>(3);
+      for (StoreFile sf : region.getStore(family).getStorefiles()) {
+        storeFiles.add(sf.getPath().getName());
+      }
+
+      // now verify that the flush markers are written
+      hlog.close();
+      HLog.Reader reader = HLogFactory.createReader(fs,
+        fs.listStatus(fs.listStatus(logDir)[0].getPath())[0].getPath(),
+        TEST_UTIL.getConfiguration());
+
+      List<HLog.Entry> flushDescriptors = new ArrayList<HLog.Entry>();
+      long lastFlushSeqId = -1;
+      while (true) {
+        HLog.Entry entry = reader.next();
+        if (entry == null) {
+          break;
+        }
+        Cell cell = entry.getEdit().getKeyValues().get(0);
+        if (WALEdit.isMetaEditFamily(cell)) {
+          FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
+          assertNotNull(flushDesc);
+          assertArrayEquals(tableName.getName(), 
flushDesc.getTableName().toByteArray());
+          if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+            assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
+          } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+            assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
+          }
+          lastFlushSeqId = flushDesc.getFlushSequenceNumber();
+          assertArrayEquals(regionName, 
flushDesc.getEncodedRegionName().toByteArray());
+          assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
+          StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
+          assertArrayEquals(family, 
storeFlushDesc.getFamilyName().toByteArray());
+          assertEquals("family", storeFlushDesc.getStoreHomeDir());
+          if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+            assertEquals(0, storeFlushDesc.getFlushOutputCount());
+          } else {
+            assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one 
file from flush
+            assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
+          }
+
+          flushDescriptors.add(entry);
+        }
+      }
+
+      assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and 
COMMIT_FLUSH per flush
+
+      // now write those markers to the recovered edits again.
+
+      Path recoveredEditsDir = 
HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+
+      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 
1000));
+      fs.create(recoveredEdits);
+      HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, 
recoveredEdits, CONF);
+
+      for (HLog.Entry entry : flushDescriptors) {
+        writer.append(entry);
+      }
+      writer.close();
+
+      // close the region now, and reopen again
+      region.close();
+      region = HRegion.openHRegion(region, null);
+
+      // now check whether we have can read back the data from region
+      for (long i = minSeqId; i < maxSeqId; i++) {
+        Get get = new Get(Bytes.toBytes(i));
+        Result result = region.get(get);
+        byte[] value = result.getValue(family, Bytes.toBytes(i));
+        assertArrayEquals(Bytes.toBytes(i), value);
+      }
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
+  class IsFlushWALMarker extends ArgumentMatcher<WALEdit> {
+    volatile FlushAction[] actions;
+    public IsFlushWALMarker(FlushAction... actions) {
+      this.actions = actions;
+    }
+    @Override
+    public boolean matches(Object edit) {
+      List<KeyValue> kvs = ((WALEdit)edit).getKeyValues();
+      if (kvs.isEmpty()) {
+        return false;
+      }
+      if (WALEdit.isMetaEditFamily(kvs.get(0))) {
+        FlushDescriptor desc = null;
+        try {
+          desc = WALEdit.getFlushDescriptor(kvs.get(0));
+        } catch (IOException e) {
+          LOG.warn(e);
+          return false;
+        }
+        if (desc != null) {
+          for (FlushAction action : actions) {
+            if (desc.getAction() == action) {
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+    public IsFlushWALMarker set(FlushAction... actions) {
+      this.actions = actions;
+      return this;
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testFlushMarkersWALFail() throws Exception {
+    // test the cases where the WAL append for flush markers fail.
+    String method = name.getMethodName();
+    TableName tableName = TableName.valueOf(method);
+    byte[] family = Bytes.toBytes("family");
+
+    // spy an actual WAL implementation to throw exception (was not able to 
mock)
+    Path logDir = 
TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log");
+    HLog hlog = spy(HLogFactory.createHLog(FILESYSTEM, logDir, 
UUID.randomUUID().toString(),
+      TEST_UTIL.getConfiguration()));
+
+    this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
+      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, 
hlog, family);
+    try {
+      int i = 0;
+      Put put = new Put(Bytes.toBytes(i));
+      put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
+      put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
+      region.put(put);
+
+      // 1. Test case where START_FLUSH throws exception
+      IsFlushWALMarker isFlushWALMarker = new 
IsFlushWALMarker(FlushAction.START_FLUSH);
+
+      // throw exceptions if the WalEdit is a start flush action
+      when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), 
(HLogKey)any(),
+        (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), 
Mockito.anyBoolean(),
+        (List<KeyValue>)any()))
+          .thenThrow(new IOException("Fail to append flush marker"));
+
+      // start cache flush will throw exception
+      try {
+        region.flushcache();
+        fail("This should have thrown exception");
+      } catch (DroppedSnapshotException unexpected) {
+        // this should not be a dropped snapshot exception. Meaning that RS 
will not abort
+        throw unexpected;
+      } catch (IOException expected) {
+        // expected
+      }
+
+      // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw 
exception
+      isFlushWALMarker.set(FlushAction.COMMIT_FLUSH);
+
+      try {
+        region.flushcache();
+        fail("This should have thrown exception");
+      } catch (DroppedSnapshotException expected) {
+        // we expect this exception, since we were able to write the snapshot, 
but failed to
+        // write the flush marker to WAL
+      } catch (IOException unexpected) {
+        throw unexpected;
+      }
+
+      region.close();
+      this.region = initHRegion(tableName.getName(), 
HConstants.EMPTY_START_ROW,
+        HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, 
hlog, family);
+      region.put(put);
+
+      // 3. Test case where ABORT_FLUSH will throw exception.
+      // Even if ABORT_FLUSH throws exception, we should not fail with IOE, 
but continue with
+      // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to 
abort
+      isFlushWALMarker.set(FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH);
+
+      try {
+        region.flushcache();
+        fail("This should have thrown exception");
+      } catch (DroppedSnapshotException expected) {
+        // we expect this exception, since we were able to write the snapshot, 
but failed to
+        // write the flush marker to WAL
+      } catch (IOException unexpected) {
+        throw unexpected;
+      }
+
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
   public void testGetWhileRegionClose() throws IOException {
     TableName tableName = TableName.valueOf(name.getMethodName());
     Configuration hc = initSplit();

Reply via email to