This is an automated email from the ASF dual-hosted git repository.

anoopsamjohn pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a72c62b  HBASE-24984 WAL corruption due to early DBBs re-use when 
Durability.ASYNC_WAL is used with multi operation (#3512)
a72c62b is described below

commit a72c62b022535e26ed9a1578795b587a1ee0d782
Author: gkanade <[email protected]>
AuthorDate: Wed Jul 21 20:00:25 2021 -0700

    HBASE-24984 WAL corruption due to early DBBs re-use when 
Durability.ASYNC_WAL is used with multi operation (#3512)
    
    Signed-off-by: Anoop <[email protected]>
    Signed-off-by: zhangduo <[email protected]>
    Signed-off-by: Andrew Purtell <[email protected]>
    Signed-off-by: Huaxiang Sun <[email protected]>
---
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  41 ++++---
 ...ruptionWithMultiPutDueToDanglingByteBuffer.java | 121 +++++++++++++++++++++
 ...ithMultiPutDueToDanglingByteBufferTestBase.java | 104 ++++++++++++++++++
 3 files changed, 245 insertions(+), 21 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index b53c770..9a4c11a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -93,11 +93,13 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
   private long exceptionSize = 0;
   private final boolean retryImmediatelySupported;
 
-  // This is a dirty hack to address HBASE-22539. The lowest bit is for normal 
rpc cleanup, and the
-  // second bit is for WAL reference. We can only call release if both of them 
are zero. The reason
-  // why we can not use a general reference counting is that, we may call 
cleanup multiple times in
-  // the current implementation. We should fix this in the future.
-  private final AtomicInteger reference = new AtomicInteger(0b01);
+  // This is a dirty hack to address HBASE-22539. The highest bit is for rpc 
ref and cleanup, and
+  // the rest of the bits are for WAL reference count. We can only call 
release if all of them are
+  // zero. The reason why we can not use a general reference counting is that, 
we may call cleanup
+  // multiple times in the current implementation. We should fix this in the 
future.
+  // The refCount here will start as 0x80000000 and increment with every WAL 
reference and decrement
+  // from WAL side on release
+  private final AtomicInteger reference = new AtomicInteger(0x80000000);
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"NP_NULL_ON_SOME_PATH",
       justification = "Can't figure why this complaint is happening... see 
below")
@@ -149,13 +151,14 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     cleanup();
   }
 
-  private void release(int mask) {
+  @Override
+  public void cleanup() {
     for (;;) {
       int ref = reference.get();
-      if ((ref & mask) == 0) {
+      if ((ref & 0x80000000) == 0) {
         return;
       }
-      int nextRef = ref & (~mask);
+      int nextRef = ref & 0x7fffffff;
       if (reference.compareAndSet(ref, nextRef)) {
         if (nextRef == 0) {
           if (this.reqCleanup != null) {
@@ -167,23 +170,19 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     }
   }
 
-  @Override
-  public void cleanup() {
-    release(0b01);
-  }
-
   public void retainByWAL() {
-    for (;;) {
-      int ref = reference.get();
-      int nextRef = ref | 0b10;
-      if (reference.compareAndSet(ref, nextRef)) {
-        return;
-      }
-    }
+    reference.incrementAndGet();
   }
 
   public void releaseByWAL() {
-    release(0b10);
+    // Here this method of decrementAndGet for releasing WAL reference count 
will work in both
+    // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a 
negative or positive
+    // value respectively in these 2 cases, but the logic will work the same 
way
+    if (reference.decrementAndGet() == 0) {
+      if (this.reqCleanup != null) {
+        this.reqCleanup.run();
+      }
+    }
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java
new file mode 100644
index 0000000..407d5a2
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer
+  extends WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
+    .forClass(TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.class);
+
+  public static final class PauseWAL extends FSHLog {
+
+    private int testTableWalAppendsCount = 0;
+
+    public PauseWAL(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
+      String prefix, String suffix) throws IOException {
+      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, 
prefix, suffix);
+    }
+
+    @Override
+    protected void atHeadOfRingBufferEventHandlerAppend() {
+      // Let the 1st Append go through. The write thread will wait for this to 
go through before
+      // calling further put()
+      if (ARRIVE != null) { // Means appends as part of puts in testcase
+        // Sleep for a second so that RS handler thread put all the mini batch 
WAL appends to ring
+        // buffer.
+        if (testTableWalAppendsCount == 0) {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+          }
+        }
+        // Let the first minibatch write go through. When 2nd one comes, 
notify the waiting test
+        // case for doing further batch puts and make this WAL append thread 
to pause
+        if (testTableWalAppendsCount == 1) {
+          ARRIVE.countDown();
+          try {
+            RESUME.await();
+          } catch (InterruptedException e) {
+          }
+        }
+        testTableWalAppendsCount++;
+      }
+    }
+  }
+
+  public static final class PauseWALProvider extends 
AbstractFSWALProvider<PauseWAL> {
+
+    @Override
+    protected PauseWAL createWAL() throws IOException {
+      return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), 
CommonFSUtils.getWALRootDir(conf),
+        getWALDirectoryName(factory.factoryId), 
getWALArchiveDirectoryName(conf, factory.factoryId),
+        conf, listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+    }
+
+    @Override
+    protected void doInit(Configuration conf) throws IOException {
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, 
PauseWALProvider.class,
+      WALProvider.class);
+    UTIL.getConfiguration().setInt(HRegion.HBASE_REGIONSERVER_MINIBATCH_SIZE, 
1);
+    
UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
+      SimpleRpcServer.class.getName());
+    UTIL.getConfiguration().setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 1);
+    UTIL.getConfiguration().setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024);
+    UTIL.getConfiguration().setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 
500);
+    UTIL.startMiniCluster(1);
+    UTIL.createTable(TABLE_NAME, CF);
+    UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+}
+
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java
new file mode 100644
index 0000000..50da7a5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase 
{
+
+  private static final Logger LOG = LoggerFactory
+    .getLogger(WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  protected static CountDownLatch ARRIVE;
+
+  protected static CountDownLatch RESUME;
+
+  protected static TableName TABLE_NAME = TableName
+    .valueOf("WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase");
+
+  protected static byte[] CF = Bytes.toBytes("cf");
+
+  protected static byte[] CQ = Bytes.toBytes("cq");
+
+  private byte[] getBytes(String prefix, int index) {
+    return Bytes.toBytes(String.format("%s-%08d", prefix, index));
+  }
+
+  @Test
+  public void test() throws Exception {
+    LOG.info("Stop WAL appending...");
+    ARRIVE = new CountDownLatch(1);
+    RESUME = new CountDownLatch(1);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+      LOG.info("Put totally 100 rows in batches of 5 with " + 
Durability.ASYNC_WAL + "...");
+      int batchSize = 5;
+      List<Put> puts = new ArrayList<>(batchSize);
+      for (int i = 1; i <= 100; i++) {
+        Put p = new Put(getBytes("row", i)).addColumn(CF, CQ, 
getBytes("value", i))
+          .setDurability(Durability.ASYNC_WAL);
+        puts.add(p);
+        if (i % batchSize == 0) {
+          table.put(puts);
+          LOG.info("Wrote batch of {} rows from row {}", batchSize,
+            Bytes.toString(puts.get(0).getRow()));
+          puts.clear();
+          // Wait for few of the minibatches in 1st batch of puts to go 
through the WAL write.
+          // The WAL write will pause then
+          if (ARRIVE != null) {
+            ARRIVE.await();
+            ARRIVE = null;
+          }
+        }
+      }
+      LOG.info("Resume WAL appending...");
+      RESUME.countDown();
+      LOG.info("Put a single row to force a WAL sync...");
+      table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, 
Bytes.toBytes("value")));
+      LOG.info("Abort the only region server");
+      UTIL.getMiniHBaseCluster().abortRegionServer(0);
+      LOG.info("Start a new region server");
+      UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000);
+      UTIL.waitTableAvailable(TABLE_NAME);
+      LOG.info("Check if all rows are still valid");
+      for (int i = 1; i <= 100; i++) {
+        Result result = table.get(new Get(getBytes("row", i)));
+        assertEquals(Bytes.toString(getBytes("value", i)), 
Bytes.toString(result.getValue(CF, CQ)));
+      }
+      Result result = table.get(new Get(Bytes.toBytes("row")));
+      assertEquals("value", Bytes.toString(result.getValue(CF, CQ)));
+    }
+  }
+}

Reply via email to