Repository: hbase
Updated Branches:
  refs/heads/branch-1 7e5ce6b0a -> 9df8d4c1c


HBASE-15252 Data loss when replaying wal if HDFS timeout


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9df8d4c1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9df8d4c1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9df8d4c1

Branch: refs/heads/branch-1
Commit: 9df8d4c1ceef30fef9c896a44b53603b1f152f0f
Parents: 7e5ce6b
Author: zhangduo <zhang...@apache.org>
Authored: Fri Feb 12 08:17:10 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Feb 12 16:16:49 2016 +0800

----------------------------------------------------------------------
 .../regionserver/wal/ProtobufLogReader.java     |   3 +-
 .../hbase/regionserver/wal/TestWALReplay.java   | 113 ++++++++++++++++++-
 2 files changed, 112 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9df8d4c1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index dc5c9cc..bb25aa1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A Protobuf based WAL has the following structure:
@@ -332,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
           }
           ProtobufUtil.mergeFrom(builder, new 
LimitInputStream(this.inputStream, size),
             (int)size);
-        } catch (IOException ipbe) {
+        } catch (InvalidProtocolBufferException ipbe) {
           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; 
originalPosition=" +
             originalPosition + ", currentPosition=" + 
this.inputStream.getPos() +
             ", messageSize=" + size + ", currentAvailable=" + 
available).initCause(ipbe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9df8d4c1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index eea3cc8..f9d7da2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -22,9 +22,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.FilterInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -91,6 +98,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hdfs.DFSInputStream;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -100,6 +108,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Test replay of edits out of a WAL split.
@@ -501,7 +511,7 @@ public class TestWALReplay {
     boolean first = true;
     for (HColumnDescriptor hcd: htd.getFamilies()) {
       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, 
"x");
-      if (first ) {
+      if (first) {
         // If first, so we have at least one family w/ different seqid to rest.
         region.flush(true);
         first = false;
@@ -824,9 +834,9 @@ public class TestWALReplay {
     final Configuration newConf = HBaseConfiguration.create(this.conf);
     User user = HBaseTestingUtility.getDifferentUser(newConf,
       ".replay.wal.secondtime");
-    user.runAs(new PrivilegedExceptionAction() {
+    user.runAs(new PrivilegedExceptionAction<Void>() {
       @Override
-      public Object run() throws Exception {
+      public Void run() throws Exception {
         runWALSplit(newConf);
         FileSystem newFS = FileSystem.get(newConf);
         // 100k seems to make for about 4 flushes during HRegion#initialize.
@@ -932,6 +942,103 @@ public class TestWALReplay {
         lastestSeqNumber, editCount);
   }
 
+  /**
+   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
+   */
+  @Test
+  public void testDatalossWhenInputError() throws IOException, 
InstantiationException,
+      IllegalAccessException {
+    final TableName tableName = 
TableName.valueOf("testDatalossWhenInputError");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+    HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, 
hbaseRootDir, this.conf, htd);
+    Path regionDir = region1.getRegionFileSystem().getRegionDir();
+    HBaseTestingUtility.closeRegionAndWAL(region1);
+
+    WAL wal = createWAL(this.conf);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, 
hri, htd, wal);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, 
"x");
+    }
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+    // Now close the region (without flush), split the log, reopen the region 
and assert that
+    // replay of log has the correct effect.
+    region.close(true);
+    wal.shutdown();
+
+    runWALSplit(this.conf);
+
+    // here we let the DFSInputStream throw an IOException just after the 
WALHeader.
+    Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, 
regionDir).first();
+    FSDataInputStream stream = fs.open(editFile);
+    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
+    Class<? extends DefaultWALProvider.Reader> logReaderClass =
+        conf.getClass("hbase.regionserver.hlog.reader.impl", 
ProtobufLogReader.class,
+          DefaultWALProvider.Reader.class);
+    DefaultWALProvider.Reader reader = logReaderClass.newInstance();
+    reader.init(this.fs, editFile, conf, stream);
+    final long headerLength = stream.getPos();
+    reader.close();
+    FileSystem spyFs = spy(this.fs);
+    doAnswer(new Answer<FSDataInputStream>() {
+
+      @Override
+      public FSDataInputStream answer(InvocationOnMock invocation) throws 
Throwable {
+        FSDataInputStream stream = (FSDataInputStream) 
invocation.callRealMethod();
+        Field field = FilterInputStream.class.getDeclaredField("in");
+        field.setAccessible(true);
+        final DFSInputStream in = (DFSInputStream) field.get(stream);
+        DFSInputStream spyIn = spy(in);
+        doAnswer(new Answer<Integer>() {
+
+          private long pos;
+
+          @Override
+          public Integer answer(InvocationOnMock invocation) throws Throwable {
+            if (pos >= headerLength) {
+              throw new IOException("read over limit");
+            }
+            int b = (Integer) invocation.callRealMethod();
+            if (b > 0) {
+              pos += b;
+            }
+            return b;
+          }
+        }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
+        doAnswer(new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            invocation.callRealMethod();
+            in.close();
+            return null;
+          }
+        }).when(spyIn).close();
+        field.set(stream, spyIn);
+        return stream;
+      }
+    }).when(spyFs).open(eq(editFile));
+
+    WAL wal2 = createWAL(this.conf);
+    HRegion region2;
+    try {
+      // log replay should fail due to the IOException, otherwise we may lose 
data.
+      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
+      assertEquals(result.size(), region2.get(g).size());
+    } catch (IOException e) {
+      assertEquals("read over limit", e.getMessage());
+    }
+    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
+    assertEquals(result.size(), region2.get(g).size());
+  }
+
   static class MockWAL extends FSHLog {
     boolean doCompleteCacheFlush = false;
 

Reply via email to