Repository: hbase
Updated Branches:
  refs/heads/master f9b19c6e3 -> 0c4fbcc32


HBASE-12091 Optionally ignore edits for dropped tables for replication.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0c4fbcc3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0c4fbcc3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0c4fbcc3

Branch: refs/heads/master
Commit: 0c4fbcc32973e9fb3840cb2b5e397155044c2b0c
Parents: f9b19c6
Author: Lars Hofhansl <la...@apache.org>
Authored: Tue Nov 14 17:08:14 2017 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Tue Nov 14 17:08:14 2017 -0800

----------------------------------------------------------------------
 .../RetriesExhaustedWithDetailsException.java   |   9 +
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  15 +
 .../org/apache/hadoop/hbase/HConstants.java     |   5 +
 .../hbase/protobuf/ReplicationProtbufUtil.java  |   2 +-
 .../hbase/replication/ReplicationEndpoint.java  |   6 +
 .../HBaseInterClusterReplicationEndpoint.java   |  63 +++-
 .../regionserver/ReplicationSink.java           |   8 +
 .../regionserver/ReplicationSourceManager.java  |   2 +-
 .../TestReplicationDroppedTables.java           | 292 +++++++++++++++++++
 9 files changed, 397 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
index ff414be..cb00675 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -49,6 +50,14 @@ extends RetriesExhaustedException {
   List<Row> actions;
   List<String> hostnameAndPort;
 
+  public RetriesExhaustedWithDetailsException(final String msg) {
+    super(msg);
+  }
+
+  public RetriesExhaustedWithDetailsException(final String msg, final 
IOException e) {
+    super(msg, e);
+  }
+
   public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
                                               List<Row> actions,
                                               List<String> hostnameAndPort) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index f8195f1..d86fc62 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -254,6 +254,21 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Return the Exception thrown by the remote server wrapped in
+   * ServiceException as cause. RemoteException are left untouched.
+   *
+   * @param se ServiceException that wraps IO exception thrown by the server
+   * @return Exception wrapped in ServiceException.
+   */
+  public static IOException 
getServiceException(org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException
 e) {
+    Throwable t = e.getCause();
+    if (ExceptionUtil.isInterrupt(t)) {
+      return ExceptionUtil.asInterrupt(t);
+    }
+    return t instanceof IOException ? (IOException) t : new 
HBaseIOException(t);
+  }
+
+  /**
    * Like {@link #getRemoteException(ServiceException)} but more generic, able 
to handle more than
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index cc9fc57..834e5bb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1200,6 +1200,11 @@ public final class HConstants {
   public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
       "hbase.replication.source.maxthreads";
 
+  /** Drop edits for tables that been deleted from the replication source and 
target */
+  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
+      "hbase.replication.drop.on.deleted.table";
+
+  /** Maximum number of threads used by the replication source for shipping 
edits to the sinks */
   public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
 
   /** Configuration key for SplitLog manager timeout */

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index dbf7b5e..af9690a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -69,7 +69,7 @@ public class ReplicationProtbufUtil {
     try {
       admin.replicateWALEntry(controller, p.getFirst());
     } catch 
(org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) {
-      throw ProtobufUtil.handleRemoteException(e);
+      throw ProtobufUtil.getServiceException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index ffdba34..401da4c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -53,6 +53,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
+    private final Configuration localConf;
     private final Configuration conf;
     private final FileSystem fs;
     private final TableDescriptors tableDescriptors;
@@ -64,6 +65,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
 
     @InterfaceAudience.Private
     public Context(
+        final Configuration localConf,
         final Configuration conf,
         final FileSystem fs,
         final String peerId,
@@ -72,6 +74,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
         final MetricsSource metrics,
         final TableDescriptors tableDescriptors,
         final Abortable abortable) {
+      this.localConf = localConf;
       this.conf = conf;
       this.fs = fs;
       this.clusterId = clusterId;
@@ -84,6 +87,9 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
     public Configuration getConfiguration() {
       return conf;
     }
+    public Configuration getLocalConfiguration() {
+      return localConf;
+    }
     public FileSystem getFilesystem() {
       return fs;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index a210aae..c1ed644 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -36,6 +36,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
@@ -45,9 +47,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
@@ -79,6 +83,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
   private ClusterConnection conn;
+  private Configuration localConf;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
@@ -102,11 +107,13 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
   private Abortable abortable;
+  private boolean dropOnDeletedTables;
 
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
     this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
     decorateConf();
     this.maxRetriesMultiplier = 
this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = 
this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -139,6 +146,8 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     // conservative for now.
     this.replicationRpcLimit = (int)(0.95 * 
(double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
       RpcServer.DEFAULT_MAX_REQUEST_SIZE));
+    this.dropOnDeletedTables =
+        this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, 
false);
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -225,6 +234,37 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     return entryLists;
   }
 
+  private TableName parseTable(String msg) {
+    // ... TableNotFoundException: '<table>'/n...
+    Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
+    Matcher m = p.matcher(msg);
+    if (m.find()) {
+      String table = m.group(1);
+      try {
+        // double check that table is a valid table name
+        
TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
+        return TableName.valueOf(table);
+      } catch (IllegalArgumentException ignore) {
+      }
+    }
+    return null;
+  }
+
+  // Filter a set of batches by TableName
+  private List<List<Entry>> filterBatches(final List<List<Entry>> 
oldEntryList, TableName table) {
+    List<List<Entry>> entryLists = new ArrayList<>();
+    for (List<Entry> entries : oldEntryList) {
+      ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
+      entryLists.add(thisList);
+      for (Entry e : entries) {
+        if (!e.getKey().getTablename().equals(table)) {
+          thisList.add(e);
+        }
+      }
+    }
+    return entryLists;
+  }
+
   private void reconnectToPeerCluster() {
     ClusterConnection connection = null;
     try {
@@ -325,10 +365,27 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
           ioe = ((RemoteException) ioe).unwrapRemoteException();
           LOG.warn("Can't replicate because of an error on the remote cluster: 
", ioe);
           if (ioe instanceof TableNotFoundException) {
-            if (sleepForRetries("A table is missing in the peer cluster. "
-                + "Replication cannot proceed without losing data.", 
sleepMultiplier)) {
-              sleepMultiplier++;
+            if (dropOnDeletedTables) {
+              // this is a bit fragile, but cannot change how TNFE is 
serialized
+              // at least check whether the table name is legal
+              TableName table = parseTable(ioe.getMessage());
+              if (table != null) {
+                try (Connection localConn =
+                    
ConnectionFactory.createConnection(ctx.getLocalConfiguration())) {
+                  if (!localConn.getAdmin().tableExists(table)) {
+                    // Would potentially be better to retry in one of the 
outer loops
+                    // and add a table filter there; but that would break the 
encapsulation,
+                    // so we're doing the filtering here.
+                    LOG.info("Missing table detected at sink, local table also 
does not exist, filtering edits for '"+table+"'");
+                    batches = filterBatches(batches, table);
+                    continue;
+                  }
+                } catch (IOException iox) {
+                  LOG.warn("Exception checking for local table: ", iox);
+                }
+              }
             }
+            // fall through and sleep below
           } else {
             LOG.warn("Peer encountered RemoteException, rechecking all sinks: 
", ioe);
             replicationSinkMgr.chooseSinks();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 05b1f21..2f9f9c5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -42,12 +42,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
@@ -372,6 +374,12 @@ public class ReplicationSink {
       for (List<Row> rows : allRows) {
         table.batch(rows, null);
       }
+    } catch (RetriesExhaustedWithDetailsException rewde) {
+      for (Throwable ex : rewde.getCauses()) {
+        if (ex instanceof TableNotFoundException) {
+          throw new TableNotFoundException("'"+tableName+"'");
+        }
+      }
     } catch (InterruptedException ix) {
       throw (InterruptedIOException) new 
InterruptedIOException().initCause(ix);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 609274f..45d7d94 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -507,7 +507,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       replicationEndpoint, walFileLengthProvider, metrics);
 
     // init replication endpoint
-    replicationEndpoint.init(new 
ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(),
       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, 
server));
 
     return src;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4fbcc3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
new file mode 100644
index 0000000..df9cff2
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Category(LargeTests.class)
+public class TestReplicationDroppedTables extends TestReplicationBase {
+  private static final Log LOG = 
LogFactory.getLog(TestReplicationDroppedTables.class);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Starting and stopping replication can make us miss new logs,
+    // rolling like this makes sure the most recent one gets added to the queue
+    for ( JVMClusterUtil.RegionServerThread r :
+        utility1.getHBaseCluster().getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    int rowCount = utility1.countRows(tableName);
+    utility1.deleteTableData(tableName);
+    // truncating the table will send one Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call deleteTableData on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(rowCount);
+      scanner.close();
+      if (res.length != 0) {
+        if (res.length < lastCount) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res.length;
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsStuckBehindDroppedTable() throws Exception {
+    // Sanity check
+    // Make sure by default edits for dropped tables stall the replication 
queue, even when the
+    // table(s) in question have been deleted on both ends.
+    testEditsBehindDroppedTable(false, "test_dropped");
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsDroppedWithDroppedTable() throws Exception {
+    // Make sure by default edits for dropped tables are themselves dropped 
when the
+    // table(s) in question have been deleted on both ends.
+    testEditsBehindDroppedTable(true, "test_dropped");
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsDroppedWithDroppedTableNS() throws Exception {
+    // also try with a namespace
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createNamespace(NamespaceDescriptor.create("NS").build());
+    }
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createNamespace(NamespaceDescriptor.create("NS").build());
+    }
+    testEditsBehindDroppedTable(true, "NS:test_dropped");
+  }
+
+  private void testEditsBehindDroppedTable(boolean allowProceeding, String 
tName) throws Exception {
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, 
allowProceeding);
+    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+    // make sure we have a single region server only, so that all
+    // edits for all tables go there
+    utility1.shutdownMiniHBaseCluster();
+    utility1.startMiniHBaseCluster(1, 1);
+
+    TableName tablename = TableName.valueOf(tName);
+    byte[] familyname = Bytes.toBytes("fam");
+    byte[] row = Bytes.toBytes("row");
+
+    HTableDescriptor table = new HTableDescriptor(tablename);
+    HColumnDescriptor fam = new HColumnDescriptor(familyname);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createTable(table);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createTable(table);
+    }
+    utility1.waitUntilAllRegionsAssigned(tablename);
+    utility2.waitUntilAllRegionsAssigned(tablename);
+
+    Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+    // now suspend replication
+    admin.disablePeer("2");
+
+    // put some data (lead with 0 so the edit gets sorted before the other 
table's edits
+    //   in the replication batch)
+    // write a bunch of edits, making sure we fill a batch
+    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+    Put put = new Put(rowkey);
+    put.addColumn(familyname, row, row);
+    lHtable1.put(put);
+
+    rowkey = Bytes.toBytes("normal put");
+    put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.disableTable(tablename);
+      admin1.deleteTable(tablename);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.disableTable(tablename);
+      admin2.deleteTable(tablename);
+    }
+
+    admin.enablePeer("2");
+    if (allowProceeding) {
+      // in this we'd expect the key to make it over
+      verifyReplicationProceeded(rowkey);
+    } else {
+      verifyReplicationStuck(rowkey);
+    }
+    // just to be safe
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsBehindDroppedTableTiming() throws Exception {
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
+    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+    // make sure we have a single region server only, so that all
+    // edits for all tables go there
+    utility1.shutdownMiniHBaseCluster();
+    utility1.startMiniHBaseCluster(1, 1);
+
+    TableName tablename = TableName.valueOf("testdroppedtimed");
+    byte[] familyname = Bytes.toBytes("fam");
+    byte[] row = Bytes.toBytes("row");
+
+    HTableDescriptor table = new HTableDescriptor(tablename);
+    HColumnDescriptor fam = new HColumnDescriptor(familyname);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createTable(table);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createTable(table);
+    }
+    utility1.waitUntilAllRegionsAssigned(tablename);
+    utility2.waitUntilAllRegionsAssigned(tablename);
+
+    Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+    // now suspend replication
+    admin.disablePeer("2");
+
+    // put some data (lead with 0 so the edit gets sorted before the other 
table's edits
+    //   in the replication batch)
+    // write a bunch of edits, making sure we fill a batch
+    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+    Put put = new Put(rowkey);
+    put.addColumn(familyname, row, row);
+    lHtable1.put(put);
+
+    rowkey = Bytes.toBytes("normal put");
+    put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.disableTable(tablename);
+      admin2.deleteTable(tablename);
+    }
+
+    admin.enablePeer("2");
+    // edit should still be stuck
+
+    try (Admin admin1 = connection1.getAdmin()) {
+      // the source table still exists, replication should be stalled
+      verifyReplicationStuck(rowkey);
+
+      admin1.disableTable(tablename);
+      // still stuck, source table still exists
+      verifyReplicationStuck(rowkey);
+
+      admin1.deleteTable(tablename);
+      // now the source table is gone, replication should proceed, the
+      // offending edits be dropped
+      verifyReplicationProceeded(rowkey);
+    }
+    // just to be safe
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+  }
+
+  private void verifyReplicationProceeded(byte[] rowkey) throws Exception {
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.getRow(), rowkey);
+        break;
+      }
+    }
+  }
+
+  private void verifyReplicationStuck(byte[] rowkey) throws Exception {
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Edit should have been stuck behind dropped tables");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+}

Reply via email to