This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new def9ac7 HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
def9ac7 is described below
commit def9ac7c4586b5414371b4ff3c971de40e6d9483
Author: Guanghao Zhang <[email protected]>
AuthorDate: Fri Dec 27 15:59:23 2019 +0800
HBASE-23286 Improve MTTR: Split WAL to HFile (#820)
Signed-off-by: Duo Zhang <[email protected]>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 14 +-
.../apache/hadoop/hbase/regionserver/CellSet.java | 2 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 28 ++
.../java/org/apache/hadoop/hbase/util/FSUtils.java | 7 +-
.../hbase/wal/BoundedRecoveredEditsOutputSink.java | 8 +-
.../wal/BoundedRecoveredHFilesOutputSink.java | 240 ++++++++++++
.../org/apache/hadoop/hbase/wal/OutputSink.java | 4 +-
.../hadoop/hbase/wal/RecoveredEditsOutputSink.java | 6 +-
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 61 ++-
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 15 +-
.../regionserver/wal/AbstractTestWALReplay.java | 4 +-
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 9 +-
.../hadoop/hbase/wal/TestWALSplitToHFile.java | 408 +++++++++++++++++++++
13 files changed, 779 insertions(+), 27 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a1fc898..132d3e0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -51,11 +51,19 @@ public final class HConstants {
/** Used as a magic return value while optimized index key feature
enabled(HBASE-7845) */
public final static int INDEX_KEY_MAGIC = -2;
+
/*
- * Name of directory that holds recovered edits written by the wal log
- * splitting code, one per region
- */
+ * Name of directory that holds recovered edits written by the wal log
+ * splitting code, one per region
+ */
public static final String RECOVERED_EDITS_DIR = "recovered.edits";
+
+ /*
+ * Name of directory that holds recovered hfiles written by the wal log
+ * splitting code, one per region
+ */
+ public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
+
/**
* The first four bytes of Hadoop RPC connections
*/
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
index 5190d7b..d74655d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -52,7 +52,7 @@ public class CellSet implements NavigableSet<Cell> {
private final int numUniqueKeys;
- CellSet(final CellComparator c) {
+ public CellSet(final CellComparator c) {
this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index bcd0ebd..5711f6b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -979,6 +979,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status));
+ // Recover any hfiles if available
+ maxSeqId = Math.max(maxSeqId, loadRecoveredHFilesIfAny(stores));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
} finally {
@@ -5375,6 +5377,32 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
}
}
+ private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws
IOException {
+ Path regionDir = getWALRegionDir();
+ long maxSeqId = -1;
+ for (HStore store : stores) {
+ String familyName = store.getColumnFamilyName();
+ FileStatus[] files =
+ WALSplitUtil.getRecoveredHFiles(fs.getFileSystem(), regionDir,
familyName);
+ if (files != null && files.length != 0) {
+ for (FileStatus file : files) {
+ store.assertBulkLoadHFileOk(file.getPath());
+ Pair<Path, Path> pair =
store.preBulkLoadHFile(file.getPath().toString(), -1);
+ store.bulkLoadHFile(Bytes.toBytes(familyName),
pair.getFirst().toString(),
+ pair.getSecond());
+ maxSeqId =
+ Math.max(maxSeqId,
WALSplitUtil.getSeqIdForRecoveredHFile(file.getPath().getName()));
+ }
+ if (this.rsServices != null && store.needsCompaction()) {
+ this.rsServices.getCompactionRequestor()
+ .requestCompaction(this, store, "load recovered hfiles request
compaction",
+ Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY,
null);
+ }
+ }
+ }
+ return maxSeqId;
+ }
+
/**
* Be careful, this method will drop all data in the memstore of this region.
* Currently, this method is used to drop memstore to prevent memory leak
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index d26bd81..b89a4c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1045,7 +1045,12 @@ public abstract class FSUtils extends CommonFSUtils {
}
public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo
region) {
- return new Path(tableDir,
ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+ return getRegionDirFromTableDir(tableDir,
+ ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+ }
+
+ public static Path getRegionDirFromTableDir(Path tableDir, String
encodedRegionName) {
+ return new Path(tableDir, encodedRegionName);
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
index 795192b..1f8c195 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -45,7 +47,7 @@ class BoundedRecoveredEditsOutputSink extends
AbstractRecoveredEditsOutputSink {
// Since the splitting process may create multiple output files, we need a
map
// to track the output count of each region.
- private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new
ConcurrentHashMap<>();
+ private ConcurrentMap<String, Long> regionEditsWrittenMap = new
ConcurrentHashMap<>();
// Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
@@ -68,7 +70,7 @@ class BoundedRecoveredEditsOutputSink extends
AbstractRecoveredEditsOutputSink {
if (writer != null) {
openingWritersNum.incrementAndGet();
writer.writeRegionEntries(entries);
- regionEditsWrittenMap.compute(buffer.encodedRegionName,
+ regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
(k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
List<IOException> thrown = new ArrayList<>();
Path dst = closeRecoveredEditsWriter(writer, thrown);
@@ -125,7 +127,7 @@ class BoundedRecoveredEditsOutputSink extends
AbstractRecoveredEditsOutputSink {
}
@Override
- public Map<byte[], Long> getOutputCounts() {
+ public Map<String, Long> getOutputCounts() {
return regionEditsWrittenMap;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
new file mode 100644
index 0000000..96ab8e6
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.CellSet;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class BoundedRecoveredHFilesOutputSink extends OutputSink {
+ private static final Logger LOG =
LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
+
+ public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
+ public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
+
+ private final WALSplitter walSplitter;
+ private final Map<TableName, TableDescriptor> tableDescCache;
+ private Connection connection;
+ private Admin admin;
+ private FileSystem rootFS;
+
+ // Since the splitting process may create multiple output files, we need a
map
+ // to track the output count of each region.
+ private ConcurrentMap<String, Long> regionEditsWrittenMap = new
ConcurrentHashMap<>();
+ // Need a counter to track the opening writers.
+ private final AtomicInteger openingWritersNum = new AtomicInteger(0);
+
+ public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int
numWriters) {
+ super(controller, entryBuffers, numWriters);
+ this.walSplitter = walSplitter;
+ tableDescCache = new HashMap<>();
+ }
+
+ @Override
+ void startWriterThreads() throws IOException {
+ connection = ConnectionFactory.createConnection(walSplitter.conf);
+ admin = connection.getAdmin();
+ rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
+ super.startWriterThreads();
+ }
+
+ @Override
+ void append(RegionEntryBuffer buffer) throws IOException {
+ Map<String, CellSet> familyCells = new HashMap<>();
+ Map<String, Long> familySeqIds = new HashMap<>();
+ boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
+ for (WAL.Entry entry : buffer.entryBuffer) {
+ long seqId = entry.getKey().getSequenceId();
+ List<Cell> cells = entry.getEdit().getCells();
+ for (Cell cell : cells) {
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ continue;
+ }
+ String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
+ // comparator need to be specified for meta
+ familyCells.computeIfAbsent(familyName, key -> new CellSet(
+ isMetaTable ? CellComparatorImpl.META_COMPARATOR :
CellComparator.getInstance()))
+ .add(cell);
+ familySeqIds.compute(familyName, (k, v) -> v == null ? seqId :
Math.max(v, seqId));
+ }
+ }
+
+ // The key point is create a new writer for each column family, write
edits then close writer.
+ String regionName = Bytes.toString(buffer.encodedRegionName);
+ for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
+ String familyName = cellsEntry.getKey();
+ StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName,
regionName,
+ familySeqIds.get(familyName), familyName, isMetaTable);
+ openingWritersNum.incrementAndGet();
+ try {
+ for (Cell cell : cellsEntry.getValue()) {
+ writer.append(cell);
+ }
+ regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
+ (k, v) -> v == null ? buffer.entryBuffer.size() : v +
buffer.entryBuffer.size());
+ splits.add(writer.getPath());
+ openingWritersNum.decrementAndGet();
+ } finally {
+ writer.close();
+ }
+ }
+ }
+
+ @Override
+ public List<Path> close() throws IOException {
+ boolean isSuccessful = true;
+ try {
+ isSuccessful &= finishWriterThreads();
+ } finally {
+ isSuccessful &= writeRemainingEntryBuffers();
+ }
+ IOUtils.closeQuietly(admin);
+ IOUtils.closeQuietly(connection);
+ return isSuccessful ? splits : null;
+ }
+
+ /**
+ * Write out the remaining RegionEntryBuffers and close the writers.
+ *
+ * @return true when there is no error.
+ */
+ private boolean writeRemainingEntryBuffers() throws IOException {
+ for (EntryBuffers.RegionEntryBuffer buffer :
entryBuffers.buffers.values()) {
+ closeCompletionService.submit(() -> {
+ append(buffer);
+ return null;
+ });
+ }
+ boolean progressFailed = false;
+ try {
+ for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+ Future<Void> future = closeCompletionService.take();
+ future.get();
+ if (!progressFailed && reporter != null && !reporter.progress()) {
+ progressFailed = true;
+ }
+ }
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ closeThreadPool.shutdownNow();
+ }
+ return !progressFailed;
+ }
+
+ @Override
+ public Map<String, Long> getOutputCounts() {
+ return regionEditsWrittenMap;
+ }
+
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return regionEditsWrittenMap.size();
+ }
+
+ @Override
+ int getNumOpenWriters() {
+ return openingWritersNum.get();
+ }
+
+ @Override
+ boolean keepRegionEvent(Entry entry) {
+ return false;
+ }
+
+ private StoreFileWriter createRecoveredHFileWriter(TableName tableName,
String regionName,
+ long seqId, String familyName, boolean isMetaTable) throws IOException {
+ Path outputFile = WALSplitUtil
+ .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
+ walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf,
rootFS);
+ checkPathValid(outputFile);
+ StoreFileWriter.Builder writerBuilder =
+ new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED,
rootFS)
+ .withFilePath(outputFile);
+ HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
+ if (isMetaTable) {
+ writerBuilder.withComparator(CellComparatorImpl.META_COMPARATOR);
+ } else {
+ configContextForNonMetaWriter(tableName, familyName,
hFileContextBuilder, writerBuilder);
+ }
+ return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
+ }
+
+ private void configContextForNonMetaWriter(TableName tableName, String
familyName,
+ HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder
writerBuilder)
+ throws IOException {
+ if (!tableDescCache.containsKey(tableName)) {
+ tableDescCache.put(tableName, admin.getDescriptor(tableName));
+ }
+ TableDescriptor tableDesc = tableDescCache.get(tableName);
+ ColumnFamilyDescriptor cfd =
tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
+
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
+
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding());
+ writerBuilder.withBloomType(cfd.getBloomFilterType())
+ .withComparator(CellComparatorImpl.COMPARATOR);
+ }
+
+ private void checkPathValid(Path outputFile) throws IOException {
+ if (rootFS.exists(outputFile)) {
+ LOG.warn("this file {} may be left after last failed split ",
outputFile);
+ if (!rootFS.delete(outputFile, false)) {
+ LOG.warn("delete old generated HFile {} failed", outputFile);
+ }
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index 4472f62..57db381 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -81,7 +81,7 @@ abstract class OutputSink {
/**
* Start the threads that will pump data from the entryBuffers to the output
files.
*/
- synchronized void startWriterThreads() {
+ void startWriterThreads() throws IOException {
for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
t.start();
@@ -137,7 +137,7 @@ abstract class OutputSink {
/**
* @return a map from encoded region ID to the number of edits written out
for that region.
*/
- abstract Map<byte[], Long> getOutputCounts();
+ abstract Map<String, Long> getOutputCounts();
/**
* @return number of regions we've recovered
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index ffe805f..d338980 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -135,10 +135,10 @@ class RecoveredEditsOutputSink extends
AbstractRecoveredEditsOutputSink {
}
@Override
- public Map<byte[], Long> getOutputCounts() {
- TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ public Map<String, Long> getOutputCounts() {
+ TreeMap<String, Long> ret = new TreeMap<>();
for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
- ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+ ret.put(entry.getKey(), entry.getValue().editsWritten);
}
return ret;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index ca0d8ef..8011a8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -165,7 +166,7 @@ public final class WALSplitUtil {
* RECOVERED_EDITS_DIR under the region creating it if necessary.
* @param tableName the table name
* @param encodedRegionName the encoded region name
- * @param sedId the sequence id which used to generate file name
+ * @param seqId the sequence id which used to generate file name
* @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name.
* @param tmpDirName of the directory used to sideline old recovered edits
file
* @param conf configuration
@@ -174,7 +175,7 @@ public final class WALSplitUtil {
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
- static Path getRegionSplitEditsPath(TableName tableName, byte[]
encodedRegionName, long sedId,
+ static Path getRegionSplitEditsPath(TableName tableName, byte[]
encodedRegionName, long seqId,
String fileNameBeingSplit, String tmpDirName, Configuration conf) throws
IOException {
FileSystem walFS = FSUtils.getWALFileSystem(conf);
Path tableDir = FSUtils.getWALTableDir(conf, tableName);
@@ -203,7 +204,7 @@ public final class WALSplitUtil {
// Append fileBeingSplit to prevent name conflict since we may have
duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
- String fileName = formatRecoveredEditsFileName(sedId);
+ String fileName = formatRecoveredEditsFileName(seqId);
fileName = getTmpRecoveredEditsFileName(fileName + "-" +
fileNameBeingSplit);
return new Path(dir, fileName);
}
@@ -563,4 +564,58 @@ public final class WALSplitUtil {
return mutations;
}
+
+ /**
+ * Path to a file under recovered.hfiles directory of the region's column
family: e.g.
+ * /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method
also ensures existence
+ * of recovered.hfiles directory under the region's column family, creating
it if necessary.
+ *
+ * @param tableName the table name
+ * @param encodedRegionName the encoded region name
+ * @param familyName the column family name
+ * @param seqId the sequence id which used to generate file name
+ * @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name
+ * @param conf configuration
+ * @param rootFS the root file system
+ * @return Path to file into which to dump split log edits.
+ */
+ static Path getRegionRecoveredHFilePath(TableName tableName, String
encodedRegionName,
+ String familyName, long seqId, String fileNameBeingSplit, Configuration
conf, FileSystem rootFS)
+ throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path regionDir =
+ FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir,
tableName), encodedRegionName);
+ Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
+
+ if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
+ LOG.warn("mkdir failed on {}, region {}, column family {}", dir,
encodedRegionName,
+ familyName);
+ }
+
+ String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
+ return new Path(dir, fileName);
+ }
+
+ private static String formatRecoveredHFileName(long seqId, String
fileNameBeingSplit) {
+ return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
+ }
+
+ public static long getSeqIdForRecoveredHFile(String fileName) {
+ return Long.parseLong(fileName.split("-")[0]);
+ }
+
+ /**
+ * @param regionDir This regions directory in the filesystem
+ * @param familyName The column family name
+ * @return The directory that holds recovered hfiles for the region's column
family
+ */
+ private static Path getStoreDirRecoveredHFilesDir(final Path regionDir,
String familyName) {
+ return new Path(new Path(regionDir, familyName),
HConstants.RECOVERED_HFILES_DIR);
+ }
+
+ public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
+ final Path regionDir, String familyName) throws IOException {
+ Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
+ return FSUtils.listStatus(rootFS, dir);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index a435c78..8a66c17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.wal;
+import static
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
+import static
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
import java.io.EOFException;
@@ -112,7 +114,7 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
FileSystem walFS,
- LastSequenceId idChecker, SplitLogWorkerCoordination
splitLogWorkerCoordination) {
+ LastSequenceId idChecker, SplitLogWorkerCoordination
splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY,
WALCellCodec.class.getName());
@@ -129,16 +131,21 @@ public class WALSplitter {
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded =
conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+ boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE,
DEFAULT_WAL_SPLIT_TO_HFILE);
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 *
1024);
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
- if (splitWriterCreationBounded) {
+
+ if (splitToHFile) {
+ entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
+ outputSink =
+ new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers,
numWriterThreads);
+ } else if (splitWriterCreationBounded) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers,
numWriterThreads);
} else {
entryBuffers = new EntryBuffers(controller, bufferSize);
- outputSink =
- new RecoveredEditsOutputSink(this, controller, entryBuffers,
numWriterThreads);
+ outputSink = new RecoveredEditsOutputSink(this, controller,
entryBuffers, numWriterThreads);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index c909bf1..4421453 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -642,7 +642,7 @@ public abstract class AbstractTestWALReplay {
// Only throws exception if throwExceptionWhenFlushing is set true.
public static class CustomStoreFlusher extends DefaultStoreFlusher {
// Switch between throw and not throw exception in flush
- static final AtomicBoolean throwExceptionWhenFlushing = new
AtomicBoolean(false);
+ public static final AtomicBoolean throwExceptionWhenFlushing = new
AtomicBoolean(false);
public CustomStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
@@ -1173,7 +1173,7 @@ public abstract class AbstractTestWALReplay {
wal.sync();
}
- static List<Put> addRegionEdits(final byte[] rowName, final byte[] family,
final int count,
+ public static List<Put> addRegionEdits(final byte[] rowName, final byte[]
family, final int count,
EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws
IOException {
List<Put> puts = new ArrayList<>();
for (int j = 0; j < count; j++) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 8ddd0ea..33d5477 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -1064,11 +1064,10 @@ public class TestWALSplit {
logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
// Verify number of written edits per region
- Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
- for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
- LOG.info("Got " + entry.getValue() + " output edits for region " +
- Bytes.toString(entry.getKey()));
- assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+ Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
+ for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
+ LOG.info("Got " + entry.getValue() + " output edits for region " +
entry.getKey());
+ assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
}
assertEquals("Should have as many outputs as regions", regions.size(),
outputCounts.size());
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
new file mode 100644
index 0000000..e1ea036
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
@@ -0,0 +1,408 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static
org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
+import static
org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestWALSplitToHFile {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractTestWALReplay.class);
+ static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+ private Path rootDir = null;
+ private String logName;
+ private Path oldLogDir;
+ private Path logDir;
+ private FileSystem fs;
+ private Configuration conf;
+ private WALFactory wals;
+
+ @Rule
+ public final TestName TEST_NAME = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
+ UTIL.startMiniCluster(3);
+ Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new
Path("/hbase"));
+ LOG.info("hbase.rootdir=" + hbaseRootDir);
+ FSUtils.setRootDir(conf, hbaseRootDir);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
+ this.fs = UTIL.getDFSCluster().getFileSystem();
+ this.rootDir = FSUtils.getRootDir(this.conf);
+ this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ String serverName =
+ ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010,
System.currentTimeMillis())
+ .toString();
+ this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
+ this.logDir = new Path(this.rootDir, logName);
+ if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
+ UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
+ }
+ this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.wals.close();
+ UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
+ }
+
+ /*
+ * @param p Directory to cleanup
+ */
+ private void deleteDir(final Path p) throws IOException {
+ if (this.fs.exists(p)) {
+ if (!this.fs.delete(p, true)) {
+ throw new IOException("Failed remove of " + p);
+ }
+ }
+ }
+
+ private TableDescriptor createBasic3FamilyTD(final TableName tableName)
throws IOException {
+ TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableName);
+
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
+
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
+
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
+ TableDescriptor td = builder.build();
+ UTIL.getAdmin().createTable(td);
+ return td;
+ }
+
+ private WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
throws IOException {
+ FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
+ wal.init();
+ return wal;
+ }
+
+ /**
+ * Test writing edits into an HRegion, closing it, splitting logs, opening
+ * Region again. Verify seqids.
+ */
+ @Test
+ public void testReplayEditsWrittenViaHRegion()
+ throws IOException, SecurityException, IllegalArgumentException,
InterruptedException {
+ final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+ final TableDescriptor td = createBasic3FamilyTD(tableName);
+ final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+ final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+ deleteDir(basedir);
+ final byte[] rowName = tableName.getName();
+ final int countPerFamily = 10;
+
+ HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir,
this.conf, td);
+ HBaseTestingUtility.closeRegionAndWAL(region3);
+ // Write countPerFamily edits into the three families. Do a flush on one
+ // of the families during the load of edits so its seqid is not same as
+ // others to test we do right thing when different seqids.
+ WAL wal = createWAL(this.conf, rootDir, logName);
+ HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td,
wal);
+ long seqid = region.getOpenSeqNum();
+ boolean first = true;
+ for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+ addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region,
"x");
+ if (first) {
+ // If first, so we have at least one family w/ different seqid to rest.
+ region.flush(true);
+ first = false;
+ }
+ }
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * td.getColumnFamilies().length,
result.size());
+ // Now close the region (without flush), split the log, reopen the region
and assert that
+ // replay of log has the correct effect, that our seqids are calculated
correctly so
+ // all edits in logs are seen as 'stale'/old.
+ region.close(true);
+ wal.shutdown();
+ WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf),
this.conf, wals);
+ WAL wal2 = createWAL(this.conf, rootDir, logName);
+ HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td,
wal2);
+ long seqid2 = region2.getOpenSeqNum();
+ assertTrue(seqid + result.size() < seqid2);
+ final Result result1b = region2.get(g);
+ assertEquals(result.size(), result1b.size());
+
+ // Next test. Add more edits, then 'crash' this region by stealing its wal
+ // out from under it and assert that replay of the log adds the edits back
+ // correctly when region is opened again.
+ for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
+ addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2,
"y");
+ }
+ // Get count of edits.
+ final Result result2 = region2.get(g);
+ assertEquals(2 * result.size(), result2.size());
+ wal2.sync();
+ final Configuration newConf = HBaseConfiguration.create(this.conf);
+ User user = HBaseTestingUtility.getDifferentUser(newConf,
tableName.getNameAsString());
+ user.runAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf),
conf, wals);
+ FileSystem newFS = FileSystem.get(newConf);
+ // Make a new wal for new region open.
+ WAL wal3 = createWAL(newConf, rootDir, logName);
+ HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td,
null);
+ long seqid3 = region3.initialize();
+ Result result3 = region3.get(g);
+ // Assert that count of cells is same as before crash.
+ assertEquals(result2.size(), result3.size());
+
+ // I can't close wal1. Its been appropriated when we split.
+ region3.close();
+ wal3.close();
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Test that we recover correctly when there is a failure in between the
+ * flushes. i.e. Some stores got flushed but others did not.
+ * Unfortunately, there is no easy hook to flush at a store level. The way
+ * we get around this is by flushing at the region level, and then deleting
+ * the recently flushed store file for one of the Stores. This would put us
+ * back in the situation where all but that store got flushed and the region
+ * died.
+ * We restart Region again, and verify that the edits were replayed.
+ */
+ @Test
+ public void testReplayEditsAfterPartialFlush()
+ throws IOException, SecurityException, IllegalArgumentException {
+ final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+ final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+ final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+ deleteDir(basedir);
+ final byte[] rowName = tableName.getName();
+ final int countPerFamily = 10;
+ final TableDescriptor td = createBasic3FamilyTD(tableName);
+ HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir,
this.conf, td);
+ HBaseTestingUtility.closeRegionAndWAL(region3);
+ // Write countPerFamily edits into the three families. Do a flush on one
+ // of the families during the load of edits so its seqid is not same as
+ // others to test we do right thing when different seqids.
+ WAL wal = createWAL(this.conf, rootDir, logName);
+ HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td,
wal);
+ long seqid = region.getOpenSeqNum();
+ for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+ addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region,
"x");
+ }
+
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * td.getColumnFamilies().length,
result.size());
+
+ // Let us flush the region
+ region.flush(true);
+ region.close(true);
+ wal.shutdown();
+
+ // delete the store files in the second column family to simulate a failure
+ // in between the flushcache();
+ // we have 3 families. killing the middle one ensures that taking the
maximum
+ // will make us fail.
+ int cf_count = 0;
+ for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+ cf_count++;
+ if (cf_count == 2) {
+ region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
+ }
+ }
+
+ // Let us try to split and recover
+ WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf),
this.conf, wals);
+ WAL wal2 = createWAL(this.conf, rootDir, logName);
+ HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td,
wal2);
+ long seqid2 = region2.getOpenSeqNum();
+ assertTrue(seqid + result.size() < seqid2);
+
+ final Result result1b = region2.get(g);
+ assertEquals(result.size(), result1b.size());
+ }
+
+ /**
+ * Test that we could recover the data correctly after aborting flush. In the
+ * test, first we abort flush after writing some data, then writing more data
+ * and flush again, at last verify the data.
+ */
+ @Test
+ public void testReplayEditsAfterAbortingFlush() throws IOException {
+ final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+ final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
+ final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
+ deleteDir(basedir);
+ final TableDescriptor td = createBasic3FamilyTD(tableName);
+ HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir,
this.conf, td);
+ HBaseTestingUtility.closeRegionAndWAL(region3);
+ // Write countPerFamily edits into the three families. Do a flush on one
+ // of the families during the load of edits so its seqid is not same as
+ // others to test we do right thing when different seqids.
+ WAL wal = createWAL(this.conf, rootDir, logName);
+ RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
+ Mockito.doReturn(false).when(rsServices).isAborted();
+ when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10,
10));
+ when(rsServices.getConfiguration()).thenReturn(conf);
+ Configuration customConf = new Configuration(this.conf);
+ customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+ AbstractTestWALReplay.CustomStoreFlusher.class.getName());
+ HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal,
customConf, rsServices, null);
+ int writtenRowCount = 10;
+ List<ColumnFamilyDescriptor> families =
Arrays.asList(td.getColumnFamilies());
+ for (int i = 0; i < writtenRowCount; i++) {
+ Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+ put.addColumn(families.get(i % families.size()).getName(),
Bytes.toBytes("q"),
+ Bytes.toBytes("val"));
+ region.put(put);
+ }
+
+ // Now assert edits made it in.
+ RegionScanner scanner = region.getScanner(new Scan());
+ assertEquals(writtenRowCount, getScannedCount(scanner));
+
+ // Let us flush the region
+
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
+ try {
+ region.flush(true);
+ fail("Injected exception hasn't been thrown");
+ } catch (IOException e) {
+ LOG.info("Expected simulated exception when flushing region, {}",
e.getMessage());
+ // simulated to abort server
+ Mockito.doReturn(true).when(rsServices).isAborted();
+ region.setClosing(false); // region normally does not accept writes after
+ // DroppedSnapshotException. We mock around it for this test.
+ }
+ // writing more data
+ int moreRow = 10;
+ for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
+ Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+ put.addColumn(families.get(i % families.size()).getName(),
Bytes.toBytes("q"),
+ Bytes.toBytes("val"));
+ region.put(put);
+ }
+ writtenRowCount += moreRow;
+ // call flush again
+
AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
+ try {
+ region.flush(true);
+ } catch (IOException t) {
+ LOG.info(
+ "Expected exception when flushing region because server is stopped,"
+ t.getMessage());
+ }
+
+ region.close(true);
+ wal.shutdown();
+
+ // Let us try to split and recover
+ WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf),
this.conf, wals);
+ WAL wal2 = createWAL(this.conf, rootDir, logName);
+ Mockito.doReturn(false).when(rsServices).isAborted();
+ HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2,
this.conf, rsServices, null);
+ scanner = region2.getScanner(new Scan());
+ assertEquals(writtenRowCount, getScannedCount(scanner));
+ }
+
+ private int getScannedCount(RegionScanner scanner) throws IOException {
+ int scannedCount = 0;
+ List<Cell> results = new ArrayList<>();
+ while (true) {
+ boolean existMore = scanner.next(results);
+ if (!results.isEmpty()) {
+ scannedCount++;
+ }
+ if (!existMore) {
+ break;
+ }
+ results.clear();
+ }
+ return scannedCount;
+ }
+}