Repository: hbase
Updated Branches:
refs/heads/0.98 5716dda86 -> 7fe910778
HBASE-15252 Data loss when replaying wal if HDFS timeout
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fe91077
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fe91077
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fe91077
Branch: refs/heads/0.98
Commit: 7fe91077826374a5b360f7362f1df5db3a48b985
Parents: 5716dda
Author: zhangduo <[email protected]>
Authored: Fri Feb 12 08:17:10 2016 +0800
Committer: zhangduo <[email protected]>
Committed: Fri Feb 12 16:46:52 2016 +0800
----------------------------------------------------------------------
.../regionserver/wal/ProtobufLogReader.java | 3 +-
.../hbase/regionserver/wal/TestWALReplay.java | 117 ++++++++++++++++++-
2 files changed, 115 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe91077/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 8c2da41..a1377cf 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -42,6 +42,7 @@ import
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
* A Protobuf based WAL has the following structure:
@@ -293,7 +294,7 @@ public class ProtobufLogReader extends ReaderBase {
}
ProtobufUtil.mergeFrom(builder, new
LimitInputStream(this.inputStream, size),
(int)size);
- } catch (IOException ipbe) {
+ } catch (InvalidProtocolBufferException ipbe) {
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring;
originalPosition=" +
originalPosition + ", currentPosition=" +
this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" +
available).initCause(ipbe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe91077/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index b3e7bba..f8d88b5 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -21,8 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import java.io.FilterInputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
@@ -34,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,7 +53,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -71,12 +77,14 @@ import
org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -84,6 +92,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Test replay of edits out of a WAL split.
@@ -477,7 +487,7 @@ public class TestWALReplay {
boolean first = true;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region,
"x");
- if (first ) {
+ if (first) {
// If first, so we have at least one family w/ different seqid to rest.
region.flushcache();
first = false;
@@ -805,8 +815,9 @@ public class TestWALReplay {
final Configuration newConf = HBaseConfiguration.create(this.conf);
User user = HBaseTestingUtility.getDifferentUser(newConf,
".replay.wal.secondtime");
- user.runAs(new PrivilegedExceptionAction() {
- public Object run() throws Exception {
+ user.runAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
// 100k seems to make for about 4 flushes during HRegion#initialize.
@@ -896,6 +907,104 @@ public class TestWALReplay {
lastestSeqNumber, editCount);
}
+ /**
+ * testcase for https://issues.apache.org/jira/browse/HBASE-15252
+ */
+ @Test
+ public void testDatalossWhenInputError() throws IOException,
InstantiationException,
+ IllegalAccessException {
+ final TableName tableName =
TableName.valueOf("testDatalossWhenInputError");
+ final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+ final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+ deleteDir(basedir);
+ final byte[] rowName = tableName.getName();
+ final int countPerFamily = 10;
+ final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+ HRegion region1 = HRegion.createHRegion(hri,
+ hbaseRootDir, this.conf, htd);
+ Path regionDir = region1.getRegionFileSystem().getRegionDir();
+ HRegion.closeHRegion(region1);
+
+ HLog wal = createWAL(this.conf);
+ HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir,
hri, htd, wal);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region,
"x");
+ }
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+ // Now close the region (without flush), split the log, reopen the region
and assert that
+ // replay of log has the correct effect.
+ region.close(true);
+ wal.close();
+
+ runWALSplit(this.conf);
+
+ // here we let the DFSInputStream throw an IOException just after the
WALHeader.
+ Path editFile = HLogUtil.getSplitEditFilesSorted(this.fs,
regionDir).first();
+ FSDataInputStream stream = fs.open(editFile);
+ stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
+ Class<? extends HLog.Reader> logReaderClass =
+ conf.getClass("hbase.regionserver.hlog.reader.impl",
ProtobufLogReader.class,
+ HLog.Reader.class);
+ HLog.Reader reader = logReaderClass.newInstance();
+ reader.init(this.fs, editFile, conf, stream);
+ final long headerLength = stream.getPos();
+ reader.close();
+ FileSystem spyFs = spy(this.fs);
+ doAnswer(new Answer<FSDataInputStream>() {
+
+ @Override
+ public FSDataInputStream answer(InvocationOnMock invocation) throws
Throwable {
+ FSDataInputStream stream = (FSDataInputStream)
invocation.callRealMethod();
+ Field field = FilterInputStream.class.getDeclaredField("in");
+ field.setAccessible(true);
+ final DFSInputStream in = (DFSInputStream) field.get(stream);
+ DFSInputStream spyIn = spy(in);
+ doAnswer(new Answer<Integer>() {
+
+ private long pos;
+
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ if (pos >= headerLength) {
+ throw new IOException("read over limit");
+ }
+ int b = (Integer) invocation.callRealMethod();
+ if (b > 0) {
+ pos += b;
+ }
+ return b;
+ }
+ }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ invocation.callRealMethod();
+ in.close();
+ return null;
+ }
+ }).when(spyIn).close();
+ field.set(stream, spyIn);
+ return stream;
+ }
+ }).when(spyFs).open(eq(editFile));
+
+ HLog wal2 = createWAL(this.conf);
+ HRegion region2;
+ try {
+ // log replay should fail due to the IOException, otherwise we may lose
data.
+ region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
+ assertEquals(result.size(), region2.get(g).size());
+ } catch (IOException e) {
+ assertEquals("read over limit", e.getMessage());
+ }
+ region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
+ assertEquals(result.size(), region2.get(g).size());
+ }
+
static class MockHLog extends FSHLog {
boolean doCompleteCacheFlush = false;