Repository: hbase
Updated Branches:
  refs/heads/master 067388bfd -> eb906e20e


HBASE-20908 Infinite loop on regionserver if region replica are reduced

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eb906e20
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eb906e20
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eb906e20

Branch: refs/heads/master
Commit: eb906e20eeee76e3544ccd403f1d3a264c82a1e9
Parents: 067388b
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Thu Jul 19 14:58:59 2018 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jul 20 13:05:11 2018 -0700

----------------------------------------------------------------------
 .../RegionReplicaReplicationEndpoint.java       | 34 ++++++++++---
 .../TestRegionReplicaReplicationEndpoint.java   | 53 ++++++++++++++++----
 2 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/eb906e20/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index cb755fe..dc83eb7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -70,8 +69,10 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 
@@ -276,7 +277,8 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
         EntryBuffers entryBuffers, ClusterConnection connection, 
ExecutorService pool,
         int numWriters, int operationTimeout) {
       super(controller, entryBuffers, numWriters);
-      this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, 
operationTimeout);
+      this.sinkWriter =
+          new RegionReplicaSinkWriter(this, connection, pool, 
operationTimeout, tableDescriptors);
       this.tableDescriptors = tableDescriptors;
 
       // A cache for the table "memstore replication enabled" flag.
@@ -390,9 +392,10 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
     int operationTimeout;
     ExecutorService pool;
     Cache<TableName, Boolean> disabledAndDroppedTables;
+    TableDescriptors tableDescriptors;
 
     public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, 
ClusterConnection connection,
-        ExecutorService pool, int operationTimeout) {
+        ExecutorService pool, int operationTimeout, TableDescriptors 
tableDescriptors) {
       this.sink = sink;
       this.connection = connection;
       this.operationTimeout = operationTimeout;
@@ -400,6 +403,7 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
         = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
       this.rpcControllerFactory = 
RpcControllerFactory.instantiate(connection.getConfiguration());
       this.pool = pool;
+      this.tableDescriptors = tableDescriptors;
 
       int nonExistentTableCacheExpiryMs = connection.getConfiguration()
         
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
 5000);
@@ -506,13 +510,14 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
       }
 
       boolean tasksCancelled = false;
-      for (Future<ReplicateWALEntryResponse> task : tasks) {
+      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
         try {
-          task.get();
+          tasks.get(replicaId).get();
         } catch (InterruptedException e) {
           throw new InterruptedIOException(e.getMessage());
         } catch (ExecutionException e) {
           Throwable cause = e.getCause();
+          boolean canBeSkipped = false;
           if (cause instanceof IOException) {
             // The table can be disabled or dropped at this time. For disabled 
tables, we have no
             // cheap mechanism to detect this case because meta does not 
contain this information.
@@ -520,21 +525,34 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
             // RPC. So instead we start the replay RPC with retries and check 
whether the table is
             // dropped or disabled which might cause SocketTimeoutException, or
             // RetriesExhaustedException or similar if we get IOE.
-            if (cause instanceof TableNotFoundException || 
connection.isTableDisabled(tableName)) {
+            if (cause instanceof TableNotFoundException
+                || connection.isTableDisabled(tableName)) {
+              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache for later.
+              canBeSkipped = true;
+            } else if (tableDescriptors != null) {
+              TableDescriptor tableDescriptor = 
tableDescriptors.get(tableName);
+              if (tableDescriptor != null
+                  //(replicaId + 1) as no task is added for primary replica 
for replication
+                  && tableDescriptor.getRegionReplication() <= (replicaId + 
1)) {
+                canBeSkipped = true;
+              }
+            }
+            if (canBeSkipped) {
               if (LOG.isTraceEnabled()) {
                 LOG.trace("Skipping " + entries.size() + " entries in table " 
+ tableName
-                  + " because received exception for dropped or disabled 
table", cause);
+                    + " because received exception for dropped or disabled 
table",
+                  cause);
                 for (Entry entry : entries) {
                   LOG.trace("Skipping : " + entry);
                 }
               }
-              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache for later.
               if (!tasksCancelled) {
                 sink.getSkippedEditsCounter().addAndGet(entries.size());
                 tasksCancelled = true; // so that we do not add to skipped 
counter again
               }
               continue;
             }
+
             // otherwise rethrow
             throw (IOException)cause;
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb906e20/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 61a1fbf..04db81a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -29,6 +29,11 @@ import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -50,6 +55,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -263,7 +270,7 @@ public class TestRegionReplicaReplicationEndpoint {
     for (int i = 1; i < regionReplication; i++) {
       final Region region = regions[i];
       // wait until all the data is replicated to all secondary regions
-      Waiter.waitFor(HTU.getConfiguration(), 90000, new 
Waiter.Predicate<Exception>() {
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new 
Waiter.Predicate<Exception>() {
         @Override
         public boolean evaluate() throws Exception {
           LOG.info("verifying replication for region replica:" + 
region.getRegionInfo());
@@ -342,7 +349,6 @@ public class TestRegionReplicaReplicationEndpoint {
 
     Connection connection = 
ConnectionFactory.createConnection(HTU.getConfiguration());
     Table table = connection.getTable(tableName);
-
     try {
       // load the data to the table
 
@@ -364,26 +370,35 @@ public class TestRegionReplicaReplicationEndpoint {
 
   @Test
   public void testRegionReplicaReplicationIgnoresDisabledTables() throws 
Exception {
-    testRegionReplicaReplicationIgnoresDisabledTables(false);
+    testRegionReplicaReplicationIgnores(false, false);
   }
 
   @Test
   public void testRegionReplicaReplicationIgnoresDroppedTables() throws 
Exception {
-    testRegionReplicaReplicationIgnoresDisabledTables(true);
+    testRegionReplicaReplicationIgnores(true, false);
   }
 
-  public void testRegionReplicaReplicationIgnoresDisabledTables(boolean 
dropTable)
+  @Test
+  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws 
Exception {
+    testRegionReplicaReplicationIgnores(false, true);
+  }
+
+  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean 
disableReplication)
       throws Exception {
+
     // tests having edits from a disabled or dropped table is handled 
correctly by skipping those
     // entries and further edits after the edits from dropped/disabled table 
can be replicated
     // without problems.
-    final TableName tableName = TableName.valueOf(name.getMethodName() + 
dropTable);
+    final TableName tableName = TableName.valueOf(
+      name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + 
disableReplication);
     HTableDescriptor htd = HTU.createTableDescriptor(tableName);
     int regionReplication = 3;
     htd.setRegionReplication(regionReplication);
     HTU.deleteTableIfAny(tableName);
+
     HTU.getAdmin().createTable(htd);
-    TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" 
: "disabledTable");
+    TableName toBeDisabledTable = TableName.valueOf(
+      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" 
: "disabledTable"));
     HTU.deleteTableIfAny(toBeDisabledTable);
     htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
     htd.setRegionReplication(regionReplication);
@@ -405,28 +420,44 @@ public class TestRegionReplicaReplicationEndpoint {
     RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
         mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
     when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
+    FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
+        FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
     RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
         new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
-          (ClusterConnection) connection,
-          Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
+            (ClusterConnection) connection, 
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
+            fstd);
     RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
     HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
     byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
 
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
+        
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
     Entry entry = new Entry(
       new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
-      new WALEdit());
+        new WALEdit()
+            .add(cell));
 
     HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
     if (dropTable) {
       HTU.getAdmin().deleteTable(toBeDisabledTable);
+    } else if (disableReplication) {
+      htd.setRegionReplication(regionReplication - 2);
+      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
+      HTU.getAdmin().enableTable(toBeDisabledTable);
     }
-
     sinkWriter.append(toBeDisabledTable, encodedRegionName,
       HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
 
     assertEquals(2, skippedEdits.get());
 
+    if (disableReplication) {
+      // enable replication again so that we can verify replication
+      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
+      htd.setRegionReplication(regionReplication);
+      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
+      HTU.getAdmin().enableTable(toBeDisabledTable);
+    }
+
     try {
       // load some data to the to-be-dropped table
 

Reply via email to