This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new def9ac7  HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
def9ac7 is described below

commit def9ac7c4586b5414371b4ff3c971de40e6d9483
Author: Guanghao Zhang <[email protected]>
AuthorDate: Fri Dec 27 15:59:23 2019 +0800

    HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
    
     Signed-off-by: Duo Zhang <[email protected]>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  14 +-
 .../apache/hadoop/hbase/regionserver/CellSet.java  |   2 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  28 ++
 .../java/org/apache/hadoop/hbase/util/FSUtils.java |   7 +-
 .../hbase/wal/BoundedRecoveredEditsOutputSink.java |   8 +-
 .../wal/BoundedRecoveredHFilesOutputSink.java      | 240 ++++++++++++
 .../org/apache/hadoop/hbase/wal/OutputSink.java    |   4 +-
 .../hadoop/hbase/wal/RecoveredEditsOutputSink.java |   6 +-
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |  61 ++-
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |  15 +-
 .../regionserver/wal/AbstractTestWALReplay.java    |   4 +-
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |   9 +-
 .../hadoop/hbase/wal/TestWALSplitToHFile.java      | 408 +++++++++++++++++++++
 13 files changed, 779 insertions(+), 27 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a1fc898..132d3e0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -51,11 +51,19 @@ public final class HConstants {
 
   /** Used as a magic return value while optimized index key feature 
enabled(HBASE-7845) */
   public final static int INDEX_KEY_MAGIC = -2;
+
   /*
-     * Name of directory that holds recovered edits written by the wal log
-     * splitting code, one per region
-     */
+   * Name of directory that holds recovered edits written by the wal log
+   * splitting code, one per region
+   */
   public static final String RECOVERED_EDITS_DIR = "recovered.edits";
+
+  /*
+   * Name of directory that holds recovered hfiles written by the wal log
+   * splitting code, one per region
+   */
+  public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
+
   /**
    * The first four bytes of Hadoop RPC connections
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
index 5190d7b..d74655d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -52,7 +52,7 @@ public class CellSet implements NavigableSet<Cell>  {
 
   private final int numUniqueKeys;
 
-  CellSet(final CellComparator c) {
+  public CellSet(final CellComparator c) {
     this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
     this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index bcd0ebd..5711f6b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -979,6 +979,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         // Recover any edits if available.
         maxSeqId = Math.max(maxSeqId,
           replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
+        // Recover any hfiles if available
+        maxSeqId = Math.max(maxSeqId, loadRecoveredHFilesIfAny(stores));
         // Make sure mvcc is up to max.
         this.mvcc.advanceTo(maxSeqId);
       } finally {
@@ -5375,6 +5377,32 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
   }
 
+  private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws 
IOException {
+    Path regionDir = getWALRegionDir();
+    long maxSeqId = -1;
+    for (HStore store : stores) {
+      String familyName = store.getColumnFamilyName();
+      FileStatus[] files =
+          WALSplitUtil.getRecoveredHFiles(fs.getFileSystem(), regionDir, 
familyName);
+      if (files != null && files.length != 0) {
+        for (FileStatus file : files) {
+          store.assertBulkLoadHFileOk(file.getPath());
+          Pair<Path, Path> pair = 
store.preBulkLoadHFile(file.getPath().toString(), -1);
+          store.bulkLoadHFile(Bytes.toBytes(familyName), 
pair.getFirst().toString(),
+              pair.getSecond());
+          maxSeqId =
+              Math.max(maxSeqId, 
WALSplitUtil.getSeqIdForRecoveredHFile(file.getPath().getName()));
+        }
+        if (this.rsServices != null && store.needsCompaction()) {
+          this.rsServices.getCompactionRequestor()
+              .requestCompaction(this, store, "load recovered hfiles request 
compaction",
+                  Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, 
null);
+        }
+      }
+    }
+    return maxSeqId;
+  }
+
   /**
    * Be careful, this method will drop all data in the memstore of this region.
    * Currently, this method is used to drop memstore to prevent memory leak
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index d26bd81..b89a4c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1045,7 +1045,12 @@ public abstract class FSUtils extends CommonFSUtils {
   }
 
   public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo 
region) {
-    return new Path(tableDir, 
ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+    return getRegionDirFromTableDir(tableDir,
+        ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+  }
+
+  public static Path getRegionDirFromTableDir(Path tableDir, String 
encodedRegionName) {
+    return new Path(tableDir, encodedRegionName);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
index 795192b..1f8c195 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -45,7 +47,7 @@ class BoundedRecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
 
   // Since the splitting process may create multiple output files, we need a 
map
   // to track the output count of each region.
-  private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new 
ConcurrentHashMap<>();
+  private ConcurrentMap<String, Long> regionEditsWrittenMap = new 
ConcurrentHashMap<>();
   // Need a counter to track the opening writers.
   private final AtomicInteger openingWritersNum = new AtomicInteger(0);
 
@@ -68,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
     if (writer != null) {
       openingWritersNum.incrementAndGet();
       writer.writeRegionEntries(entries);
-      regionEditsWrittenMap.compute(buffer.encodedRegionName,
+      regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
         (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
       List<IOException> thrown = new ArrayList<>();
       Path dst = closeRecoveredEditsWriter(writer, thrown);
@@ -125,7 +127,7 @@ class BoundedRecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
   }
 
   @Override
-  public Map<byte[], Long> getOutputCounts() {
+  public Map<String, Long> getOutputCounts() {
     return regionEditsWrittenMap;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
new file mode 100644
index 0000000..96ab8e6
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.CellSet;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class BoundedRecoveredHFilesOutputSink extends OutputSink {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
+
+  public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
+  public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
+
+  private final WALSplitter walSplitter;
+  private final Map<TableName, TableDescriptor> tableDescCache;
+  private Connection connection;
+  private Admin admin;
+  private FileSystem rootFS;
+
+  // Since the splitting process may create multiple output files, we need a 
map
+  // to track the output count of each region.
+  private ConcurrentMap<String, Long> regionEditsWrittenMap = new 
ConcurrentHashMap<>();
+  // Need a counter to track the opening writers.
+  private final AtomicInteger openingWritersNum = new AtomicInteger(0);
+
+  public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
+    WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int 
numWriters) {
+    super(controller, entryBuffers, numWriters);
+    this.walSplitter = walSplitter;
+    tableDescCache = new HashMap<>();
+  }
+
+  @Override
+  void startWriterThreads() throws IOException {
+    connection = ConnectionFactory.createConnection(walSplitter.conf);
+    admin = connection.getAdmin();
+    rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
+    super.startWriterThreads();
+  }
+
+  @Override
+  void append(RegionEntryBuffer buffer) throws IOException {
+    Map<String, CellSet> familyCells = new HashMap<>();
+    Map<String, Long> familySeqIds = new HashMap<>();
+    boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
+    for (WAL.Entry entry : buffer.entryBuffer) {
+      long seqId = entry.getKey().getSequenceId();
+      List<Cell> cells = entry.getEdit().getCells();
+      for (Cell cell : cells) {
+        if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+          continue;
+        }
+        String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
+        // comparator need to be specified for meta
+        familyCells.computeIfAbsent(familyName, key -> new CellSet(
+          isMetaTable ? CellComparatorImpl.META_COMPARATOR : 
CellComparator.getInstance()))
+          .add(cell);
+        familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : 
Math.max(v, seqId));
+      }
+    }
+
+    // The key point is create a new writer for each column family, write 
edits then close writer.
+    String regionName = Bytes.toString(buffer.encodedRegionName);
+    for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
+      String familyName = cellsEntry.getKey();
+      StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, 
regionName,
+        familySeqIds.get(familyName), familyName, isMetaTable);
+      openingWritersNum.incrementAndGet();
+      try {
+        for (Cell cell : cellsEntry.getValue()) {
+          writer.append(cell);
+        }
+        regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
+          (k, v) -> v == null ? buffer.entryBuffer.size() : v + 
buffer.entryBuffer.size());
+        splits.add(writer.getPath());
+        openingWritersNum.decrementAndGet();
+      } finally {
+        writer.close();
+      }
+    }
+  }
+
+  @Override
+  public List<Path> close() throws IOException {
+    boolean isSuccessful = true;
+    try {
+      isSuccessful &= finishWriterThreads();
+    } finally {
+      isSuccessful &= writeRemainingEntryBuffers();
+    }
+    IOUtils.closeQuietly(admin);
+    IOUtils.closeQuietly(connection);
+    return isSuccessful ? splits : null;
+  }
+
+  /**
+   * Write out the remaining RegionEntryBuffers and close the writers.
+   *
+   * @return true when there is no error.
+   */
+  private boolean writeRemainingEntryBuffers() throws IOException {
+    for (EntryBuffers.RegionEntryBuffer buffer : 
entryBuffers.buffers.values()) {
+      closeCompletionService.submit(() -> {
+        append(buffer);
+        return null;
+      });
+    }
+    boolean progressFailed = false;
+    try {
+      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+        Future<Void> future = closeCompletionService.take();
+        future.get();
+        if (!progressFailed && reporter != null && !reporter.progress()) {
+          progressFailed = true;
+        }
+      }
+    } catch (InterruptedException e) {
+      IOException iie = new InterruptedIOException();
+      iie.initCause(e);
+      throw iie;
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      closeThreadPool.shutdownNow();
+    }
+    return !progressFailed;
+  }
+
+  @Override
+  public Map<String, Long> getOutputCounts() {
+    return regionEditsWrittenMap;
+  }
+
+  @Override
+  public int getNumberOfRecoveredRegions() {
+    return regionEditsWrittenMap.size();
+  }
+
+  @Override
+  int getNumOpenWriters() {
+    return openingWritersNum.get();
+  }
+
+  @Override
+  boolean keepRegionEvent(Entry entry) {
+    return false;
+  }
+
+  private StoreFileWriter createRecoveredHFileWriter(TableName tableName, 
String regionName,
+      long seqId, String familyName, boolean isMetaTable) throws IOException {
+    Path outputFile = WALSplitUtil
+      .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
+        walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, 
rootFS);
+    checkPathValid(outputFile);
+    StoreFileWriter.Builder writerBuilder =
+        new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, 
rootFS)
+            .withFilePath(outputFile);
+    HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
+    if (isMetaTable) {
+      writerBuilder.withComparator(CellComparatorImpl.META_COMPARATOR);
+    } else {
+      configContextForNonMetaWriter(tableName, familyName, 
hFileContextBuilder, writerBuilder);
+    }
+    return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
+  }
+
+  private void configContextForNonMetaWriter(TableName tableName, String 
familyName,
+      HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder 
writerBuilder)
+      throws IOException {
+    if (!tableDescCache.containsKey(tableName)) {
+      tableDescCache.put(tableName, admin.getDescriptor(tableName));
+    }
+    TableDescriptor tableDesc = tableDescCache.get(tableName);
+    ColumnFamilyDescriptor cfd = 
tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
+    
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
+        
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding());
+    writerBuilder.withBloomType(cfd.getBloomFilterType())
+        .withComparator(CellComparatorImpl.COMPARATOR);
+  }
+
+  private void checkPathValid(Path outputFile) throws IOException {
+    if (rootFS.exists(outputFile)) {
+      LOG.warn("this file {} may be left after last failed split ", 
outputFile);
+      if (!rootFS.delete(outputFile, false)) {
+        LOG.warn("delete old generated HFile {} failed", outputFile);
+      }
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index 4472f62..57db381 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -81,7 +81,7 @@ abstract class OutputSink {
   /**
    * Start the threads that will pump data from the entryBuffers to the output 
files.
    */
-  synchronized void startWriterThreads() {
+  void startWriterThreads() throws IOException {
     for (int i = 0; i < numThreads; i++) {
       WriterThread t = new WriterThread(controller, entryBuffers, this, i);
       t.start();
@@ -137,7 +137,7 @@ abstract class OutputSink {
   /**
    * @return a map from encoded region ID to the number of edits written out 
for that region.
    */
-  abstract Map<byte[], Long> getOutputCounts();
+  abstract Map<String, Long> getOutputCounts();
 
   /**
    * @return number of regions we've recovered
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index ffe805f..d338980 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -135,10 +135,10 @@ class RecoveredEditsOutputSink extends 
AbstractRecoveredEditsOutputSink {
   }
 
   @Override
-  public Map<byte[], Long> getOutputCounts() {
-    TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+  public Map<String, Long> getOutputCounts() {
+    TreeMap<String, Long> ret = new TreeMap<>();
     for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
-      ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+      ret.put(entry.getKey(), entry.getValue().editsWritten);
     }
     return ret;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index ca0d8ef..8011a8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -165,7 +166,7 @@ public final class WALSplitUtil {
    * RECOVERED_EDITS_DIR under the region creating it if necessary.
    * @param tableName the table name
    * @param encodedRegionName the encoded region name
-   * @param sedId the sequence id which used to generate file name
+   * @param seqId the sequence id which used to generate file name
    * @param fileNameBeingSplit the file being split currently. Used to 
generate tmp file name.
    * @param tmpDirName of the directory used to sideline old recovered edits 
file
    * @param conf configuration
@@ -174,7 +175,7 @@ public final class WALSplitUtil {
    */
   @SuppressWarnings("deprecation")
   @VisibleForTesting
-  static Path getRegionSplitEditsPath(TableName tableName, byte[] 
encodedRegionName, long sedId,
+  static Path getRegionSplitEditsPath(TableName tableName, byte[] 
encodedRegionName, long seqId,
       String fileNameBeingSplit, String tmpDirName, Configuration conf) throws 
IOException {
     FileSystem walFS = FSUtils.getWALFileSystem(conf);
     Path tableDir = FSUtils.getWALTableDir(conf, tableName);
@@ -203,7 +204,7 @@ public final class WALSplitUtil {
     // Append fileBeingSplit to prevent name conflict since we may have 
duplicate wal entries now.
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
-    String fileName = formatRecoveredEditsFileName(sedId);
+    String fileName = formatRecoveredEditsFileName(seqId);
     fileName = getTmpRecoveredEditsFileName(fileName + "-" + 
fileNameBeingSplit);
     return new Path(dir, fileName);
   }
@@ -563,4 +564,58 @@ public final class WALSplitUtil {
 
     return mutations;
   }
+
+  /**
+   * Path to a file under recovered.hfiles directory of the region's column 
family: e.g.
+   * /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method 
also ensures existence
+   * of recovered.hfiles directory under the region's column family, creating 
it if necessary.
+   *
+   * @param tableName          the table name
+   * @param encodedRegionName  the encoded region name
+   * @param familyName         the column family name
+   * @param seqId              the sequence id which used to generate file name
+   * @param fileNameBeingSplit the file being split currently. Used to 
generate tmp file name
+   * @param conf               configuration
+   * @param rootFS             the root file system
+   * @return Path to file into which to dump split log edits.
+   */
+  static Path getRegionRecoveredHFilePath(TableName tableName, String 
encodedRegionName,
+    String familyName, long seqId, String fileNameBeingSplit, Configuration 
conf, FileSystem rootFS)
+    throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path regionDir =
+      FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, 
tableName), encodedRegionName);
+    Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
+
+    if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
+      LOG.warn("mkdir failed on {}, region {}, column family {}", dir, 
encodedRegionName,
+        familyName);
+    }
+
+    String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
+    return new Path(dir, fileName);
+  }
+
+  private static String formatRecoveredHFileName(long seqId, String 
fileNameBeingSplit) {
+    return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
+  }
+
+  public static long getSeqIdForRecoveredHFile(String fileName) {
+    return Long.parseLong(fileName.split("-")[0]);
+  }
+
+  /**
+   * @param regionDir  This regions directory in the filesystem
+   * @param familyName The column family name
+   * @return The directory that holds recovered hfiles for the region's column 
family
+   */
+  private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, 
String familyName) {
+    return new Path(new Path(regionDir, familyName), 
HConstants.RECOVERED_HFILES_DIR);
+  }
+
+  public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
+      final Path regionDir, String familyName) throws IOException {
+    Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
+    return FSUtils.listStatus(rootFS, dir);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index a435c78..8a66c17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static 
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
+import static 
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
 import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
 
 import java.io.EOFException;
@@ -112,7 +114,7 @@ public class WALSplitter {
 
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path walDir, 
FileSystem walFS,
-      LastSequenceId idChecker, SplitLogWorkerCoordination 
splitLogWorkerCoordination) {
+    LastSequenceId idChecker, SplitLogWorkerCoordination 
splitLogWorkerCoordination) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName =
         conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
WALCellCodec.class.getName());
@@ -129,16 +131,21 @@ public class WALSplitter {
 
     // if we limit the number of writers opened for sinking recovered edits
     boolean splitWriterCreationBounded = 
conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+    boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, 
DEFAULT_WAL_SPLIT_TO_HFILE);
     long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 
1024);
     int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
-    if (splitWriterCreationBounded) {
+
+    if (splitToHFile) {
+      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
+      outputSink =
+          new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, 
numWriterThreads);
+    } else if (splitWriterCreationBounded) {
       entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
       outputSink =
           new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, 
numWriterThreads);
     } else {
       entryBuffers = new EntryBuffers(controller, bufferSize);
-      outputSink =
-          new RecoveredEditsOutputSink(this, controller, entryBuffers, 
numWriterThreads);
+      outputSink = new RecoveredEditsOutputSink(this, controller, 
entryBuffers, numWriterThreads);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index c909bf1..4421453 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -642,7 +642,7 @@ public abstract class AbstractTestWALReplay {
   // Only throws exception if throwExceptionWhenFlushing is set true.
   public static class CustomStoreFlusher extends DefaultStoreFlusher {
     // Switch between throw and not throw exception in flush
-    static final AtomicBoolean throwExceptionWhenFlushing = new 
AtomicBoolean(false);
+    public static final AtomicBoolean throwExceptionWhenFlushing = new 
AtomicBoolean(false);
 
     public CustomStoreFlusher(Configuration conf, HStore store) {
       super(conf, store);
@@ -1173,7 +1173,7 @@ public abstract class AbstractTestWALReplay {
     wal.sync();
   }
 
-  static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, 
final int count,
+  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] 
family, final int count,
       EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws 
IOException {
     List<Put> puts = new ArrayList<>();
     for (int j = 0; j < count; j++) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 8ddd0ea..33d5477 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -1064,11 +1064,10 @@ public class TestWALSplit {
     logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
 
     // Verify number of written edits per region
-    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
-    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
-      LOG.info("Got " + entry.getValue() + " output edits for region " +
-          Bytes.toString(entry.getKey()));
-      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
+    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
+      LOG.info("Got " + entry.getValue() + " output edits for region " + 
entry.getKey());
+      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
     }
     assertEquals("Should have as many outputs as regions", regions.size(), 
outputCounts.size());
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
new file mode 100644
index 0000000..e1ea036
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
@@ -0,0 +1,408 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static 
org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
+import static 
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestWALSplitToHFile {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestWALReplay.class);
+  static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+  private Path rootDir = null;
+  private String logName;
+  private Path oldLogDir;
+  private Path logDir;
+  private FileSystem fs;
+  private Configuration conf;
+  private WALFactory wals;
+
+  @Rule
+  public final TestName TEST_NAME = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
+    UTIL.startMiniCluster(3);
+    Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new 
Path("/hbase"));
+    LOG.info("hbase.rootdir=" + hbaseRootDir);
+    FSUtils.setRootDir(conf, hbaseRootDir);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
+    this.fs = UTIL.getDFSCluster().getFileSystem();
+    this.rootDir = FSUtils.getRootDir(this.conf);
+    this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    String serverName =
+        ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, 
System.currentTimeMillis())
+            .toString();
+    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
+    this.logDir = new Path(this.rootDir, logName);
+    if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
+      UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
+    }
+    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.wals.close();
+    UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
+  }
+
+  /*
+   * @param p Directory to cleanup
+   */
+  private void deleteDir(final Path p) throws IOException {
+    if (this.fs.exists(p)) {
+      if (!this.fs.delete(p, true)) {
+        throw new IOException("Failed remove of " + p);
+      }
+    }
+  }
+
+  private TableDescriptor createBasic3FamilyTD(final TableName tableName) 
throws IOException {
+    TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableName);
+    
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
+    
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
+    
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
+    TableDescriptor td = builder.build();
+    UTIL.getAdmin().createTable(td);
+    return td;
+  }
+
+  private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 
throws IOException {
+    FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
+    wal.init();
+    return wal;
+  }
+
+  /**
+   * Test writing edits into an HRegion, closing it, splitting logs, opening
+   * Region again.  Verify seqids.
+   */
+  @Test
+  public void testReplayEditsWrittenViaHRegion()
+      throws IOException, SecurityException, IllegalArgumentException, 
InterruptedException {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    final TableDescriptor td = createBasic3FamilyTD(tableName);
+    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+    final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, 
this.conf, td);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families.  Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, 
wal);
+    long seqid = region.getOpenSeqNum();
+    boolean first = true;
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, 
"x");
+      if (first) {
+        // If first, so we have at least one family w/ different seqid to rest.
+        region.flush(true);
+        first = false;
+      }
+    }
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * td.getColumnFamilies().length, 
result.size());
+    // Now close the region (without flush), split the log, reopen the region 
and assert that
+    // replay of log has the correct effect, that our seqids are calculated 
correctly so
+    // all edits in logs are seen as 'stale'/old.
+    region.close(true);
+    wal.shutdown();
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), 
this.conf, wals);
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, 
wal2);
+    long seqid2 = region2.getOpenSeqNum();
+    assertTrue(seqid + result.size() < seqid2);
+    final Result result1b = region2.get(g);
+    assertEquals(result.size(), result1b.size());
+
+    // Next test.  Add more edits, then 'crash' this region by stealing its wal
+    // out from under it and assert that replay of the log adds the edits back
+    // correctly when region is opened again.
+    for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, 
"y");
+    }
+    // Get count of edits.
+    final Result result2 = region2.get(g);
+    assertEquals(2 * result.size(), result2.size());
+    wal2.sync();
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf, 
tableName.getNameAsString());
+    user.runAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), 
conf, wals);
+        FileSystem newFS = FileSystem.get(newConf);
+        // Make a new wal for new region open.
+        WAL wal3 = createWAL(newConf, rootDir, logName);
+        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, 
null);
+        long seqid3 = region3.initialize();
+        Result result3 = region3.get(g);
+        // Assert that count of cells is same as before crash.
+        assertEquals(result2.size(), result3.size());
+
+        // I can't close wal1.  Its been appropriated when we split.
+        region3.close();
+        wal3.close();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Test that we recover correctly when there is a failure in between the
+   * flushes. i.e. Some stores got flushed but others did not.
+   * Unfortunately, there is no easy hook to flush at a store level. The way
+   * we get around this is by flushing at the region level, and then deleting
+   * the recently flushed store file for one of the Stores. This would put us
+   * back in the situation where all but that store got flushed and the region
+   * died.
+   * We restart Region again, and verify that the edits were replayed.
+   */
+  @Test
+  public void testReplayEditsAfterPartialFlush()
+      throws IOException, SecurityException, IllegalArgumentException {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+    final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final TableDescriptor td = createBasic3FamilyTD(tableName);
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, 
this.conf, td);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families.  Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, 
wal);
+    long seqid = region.getOpenSeqNum();
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, 
"x");
+    }
+
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * td.getColumnFamilies().length, 
result.size());
+
+    // Let us flush the region
+    region.flush(true);
+    region.close(true);
+    wal.shutdown();
+
+    // delete the store files in the second column family to simulate a failure
+    // in between the flushcache();
+    // we have 3 families. killing the middle one ensures that taking the 
maximum
+    // will make us fail.
+    int cf_count = 0;
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      cf_count++;
+      if (cf_count == 2) {
+        region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
+      }
+    }
+
+    // Let us try to split and recover
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), 
this.conf, wals);
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, 
wal2);
+    long seqid2 = region2.getOpenSeqNum();
+    assertTrue(seqid + result.size() < seqid2);
+
+    final Result result1b = region2.get(g);
+    assertEquals(result.size(), result1b.size());
+  }
+
+  /**
+   * Test that we could recover the data correctly after aborting flush. In the
+   * test, first we abort flush after writing some data, then writing more data
+   * and flush again, at last verify the data.
+   */
+  @Test
+  public void testReplayEditsAfterAbortingFlush() throws IOException {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+    final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+    deleteDir(basedir);
+    final TableDescriptor td = createBasic3FamilyTD(tableName);
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, 
this.conf, td);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families. Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 
10));
+    when(rsServices.getConfiguration()).thenReturn(conf);
+    Configuration customConf = new Configuration(this.conf);
+    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+        AbstractTestWALReplay.CustomStoreFlusher.class.getName());
+    HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, 
customConf, rsServices, null);
+    int writtenRowCount = 10;
+    List<ColumnFamilyDescriptor> families = 
Arrays.asList(td.getColumnFamilies());
+    for (int i = 0; i < writtenRowCount; i++) {
+      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      put.addColumn(families.get(i % families.size()).getName(), 
Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+
+    // Now assert edits made it in.
+    RegionScanner scanner = region.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+
+    // Let us flush the region
+    
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
+    try {
+      region.flush(true);
+      fail("Injected exception hasn't been thrown");
+    } catch (IOException e) {
+      LOG.info("Expected simulated exception when flushing region, {}", 
e.getMessage());
+      // simulated to abort server
+      Mockito.doReturn(true).when(rsServices).isAborted();
+      region.setClosing(false); // region normally does not accept writes after
+      // DroppedSnapshotException. We mock around it for this test.
+    }
+    // writing more data
+    int moreRow = 10;
+    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
+      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      put.addColumn(families.get(i % families.size()).getName(), 
Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+    writtenRowCount += moreRow;
+    // call flush again
+    
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
+    try {
+      region.flush(true);
+    } catch (IOException t) {
+      LOG.info(
+          "Expected exception when flushing region because server is stopped," 
+ t.getMessage());
+    }
+
+    region.close(true);
+    wal.shutdown();
+
+    // Let us try to split and recover
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), 
this.conf, wals);
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, 
this.conf, rsServices, null);
+    scanner = region2.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+  }
+
+  private int getScannedCount(RegionScanner scanner) throws IOException {
+    int scannedCount = 0;
+    List<Cell> results = new ArrayList<>();
+    while (true) {
+      boolean existMore = scanner.next(results);
+      if (!results.isEmpty()) {
+        scannedCount++;
+      }
+      if (!existMore) {
+        break;
+      }
+      results.clear();
+    }
+    return scannedCount;
+  }
+}

Reply via email to