joshelser commented on a change in pull request #1722:
URL: https://github.com/apache/hbase/pull/1722#discussion_r426737494



##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDroppedTable.java
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ LargeTests.class })
+public class TestReplicationEditsDroppedWithDroppedTable {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      
HBaseClassTestRule.forClass(TestReplicationEditsDroppedWithDroppedTable.class);
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(TestReplicationEditsDroppedWithDroppedTable.class);
+
+  private static Configuration conf1 = HBaseConfiguration.create();
+  private static Configuration conf2 = HBaseConfiguration.create();
+
+  protected static HBaseTestingUtility utility1;
+  protected static HBaseTestingUtility utility2;
+
+  private static Admin admin1;
+  private static Admin admin2;
+
+  private static final String namespace = "NS";
+  private static final TableName NORMAL_TABLE = 
TableName.valueOf("normal-table");
+  private static final TableName DROPPED_TABLE = 
TableName.valueOf("dropped-table");
+  private static final TableName DROPPED_NS_TABLE = 
TableName.valueOf("NS:dropped-table");
+  private static final byte[] ROW = Bytes.toBytes("row");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final String PEER_ID = "1";
+  private static final long SLEEP_TIME = 1000;
+  private static final int NB_RETRIES = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set true to filter replication edits for dropped table
+    
conf1.setBoolean(HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY,
 true);
+    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf1.setInt("replication.source.nb.capacity", 1);
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    conf1 = utility1.getConfiguration();
+
+    conf2 = HBaseConfiguration.create(conf1);
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+
+    utility1.startMiniCluster(1);
+    utility2.startMiniCluster(1);
+
+    admin1 = utility1.getAdmin();
+    admin2 = utility2.getAdmin();
+
+    NamespaceDescriptor nsDesc = NamespaceDescriptor.create(namespace).build();
+    admin1.createNamespace(nsDesc);
+    admin2.createNamespace(nsDesc);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    // Roll log
+    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+        .getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    // add peer
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(utility2.getClusterKey())
+        .setReplicateAllUserTables(true).build();
+    admin1.addReplicationPeer(PEER_ID, rpc);
+    // create table
+    createTable(NORMAL_TABLE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Remove peer
+    admin1.removeReplicationPeer(PEER_ID);
+    Thread.sleep(SLEEP_TIME);
+    Thread.sleep(SLEEP_TIME);

Review comment:
       Two sleeps intentional? If so, make it `SLEEP_TIME * 2`?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
##########
@@ -168,7 +183,9 @@ public void init(Context context) throws IOException {
     this.replicationRpcLimit = (int)(0.95 * 
conf.getLong(RpcServer.MAX_REQUEST_SIZE,
       RpcServer.DEFAULT_MAX_REQUEST_SIZE));
     this.dropOnDeletedTables =
-        this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, 
false);
+        this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+    this.dropOnDeletedColumnFamilies = this.conf
+        .getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);

Review comment:
       Do you think we need two separate configuration properties? If someone 
is OK dropping data in this scenario, don't you think that they would want to 
default into dropping data for all cases?

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
+import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
+import static 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ LargeTests.class })
+public class TestReplicationEditsDroppedWithDeletedTableCFs {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      
HBaseClassTestRule.forClass(TestReplicationEditsDroppedWithDeletedTableCFs.class);
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(TestReplicationEditsDroppedWithDeletedTableCFs.class);
+
+  private static Configuration conf1 = HBaseConfiguration.create();
+  private static Configuration conf2 = HBaseConfiguration.create();
+
+  protected static HBaseTestingUtility utility1;
+  protected static HBaseTestingUtility utility2;
+
+  private static Admin admin1;
+  private static Admin admin2;
+
+  private static final TableName TABLE = TableName.valueOf("table");
+  private static final byte[] NORMAL_CF = Bytes.toBytes("normal_cf");
+  private static final byte[] DROPPED_CF = Bytes.toBytes("dropped_cf");
+
+  private static final byte[] ROW = Bytes.toBytes("row");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final String PEER_ID = "1";
+  private static final long SLEEP_TIME = 1000;
+  private static final int NB_RETRIES = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set true to filter replication edits for dropped table
+    conf1.setBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, true);
+    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf1.setInt("replication.source.nb.capacity", 1);
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    conf1 = utility1.getConfiguration();
+
+    conf2 = HBaseConfiguration.create(conf1);
+    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+
+    utility1.startMiniCluster(1);
+    utility2.startMiniCluster(1);
+
+    admin1 = utility1.getAdmin();
+    admin2 = utility2.getAdmin();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    // Roll log
+    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+        .getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    // add peer
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(utility2.getClusterKey())
+        .setReplicateAllUserTables(true).build();
+    admin1.addReplicationPeer(PEER_ID, rpc);
+    // create table
+    createTable();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Remove peer
+    admin1.removeReplicationPeer(PEER_ID);
+    Thread.sleep(SLEEP_TIME);
+//    
utility1.getMiniHBaseCluster().getMaster().getReplicationZKNodeCleanerChore().choreForTesting();
+    Thread.sleep(SLEEP_TIME);

Review comment:
       Drop this?

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEditsDroppedWithDeletedTableCFs.java
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
+import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
+import static 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ LargeTests.class })
+public class TestReplicationEditsDroppedWithDeletedTableCFs {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      
HBaseClassTestRule.forClass(TestReplicationEditsDroppedWithDeletedTableCFs.class);
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(TestReplicationEditsDroppedWithDeletedTableCFs.class);
+
+  private static Configuration conf1 = HBaseConfiguration.create();
+  private static Configuration conf2 = HBaseConfiguration.create();
+
+  protected static HBaseTestingUtility utility1;
+  protected static HBaseTestingUtility utility2;
+
+  private static Admin admin1;
+  private static Admin admin2;
+
+  private static final TableName TABLE = TableName.valueOf("table");
+  private static final byte[] NORMAL_CF = Bytes.toBytes("normal_cf");
+  private static final byte[] DROPPED_CF = Bytes.toBytes("dropped_cf");
+
+  private static final byte[] ROW = Bytes.toBytes("row");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final String PEER_ID = "1";
+  private static final long SLEEP_TIME = 1000;
+  private static final int NB_RETRIES = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set true to filter replication edits for dropped table
+    conf1.setBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, true);
+    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf1.setInt("replication.source.nb.capacity", 1);
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    conf1 = utility1.getConfiguration();
+
+    conf2 = HBaseConfiguration.create(conf1);
+    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+
+    utility1.startMiniCluster(1);
+    utility2.startMiniCluster(1);
+
+    admin1 = utility1.getAdmin();
+    admin2 = utility2.getAdmin();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    // Roll log
+    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+        .getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    // add peer
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(utility2.getClusterKey())
+        .setReplicateAllUserTables(true).build();
+    admin1.addReplicationPeer(PEER_ID, rpc);
+    // create table
+    createTable();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Remove peer
+    admin1.removeReplicationPeer(PEER_ID);
+    Thread.sleep(SLEEP_TIME);
+//    
utility1.getMiniHBaseCluster().getMaster().getReplicationZKNodeCleanerChore().choreForTesting();
+    Thread.sleep(SLEEP_TIME);
+    // Drop table
+    admin1.disableTable(TABLE);
+    admin1.deleteTable(TABLE);
+    admin2.disableTable(TABLE);
+    admin2.deleteTable(TABLE);
+  }
+
+  private void createTable() throws Exception {
+    TableDescriptor desc = createTableDescriptor(NORMAL_CF, DROPPED_CF);
+    admin1.createTable(desc);
+    admin2.createTable(desc);
+    utility1.waitUntilAllRegionsAssigned(desc.getTableName());
+    utility2.waitUntilAllRegionsAssigned(desc.getTableName());
+  }
+
+  @Test
+  public void testEditsDroppedWithDeleteCF() throws Exception {
+    admin1.disableReplicationPeer(PEER_ID);
+
+    try (Table table = utility1.getConnection().getTable(TABLE)) {
+      Put put = new Put(ROW);
+      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
+      table.put(put);
+    }
+
+    deleteCf(admin1);
+
+    admin1.enableReplicationPeer(PEER_ID);
+
+    verifyReplicationProceeded();
+  }
+
+  @Test
+  public void testEditsBehindDeleteCFTiming() throws Exception {
+    admin1.disableReplicationPeer(PEER_ID);
+
+    try (Table table = utility1.getConnection().getTable(TABLE)) {
+      Put put = new Put(ROW);
+      put.addColumn(DROPPED_CF, QUALIFIER, VALUE);
+      table.put(put);
+    }
+
+    // Only delete cf from peer cluster
+    deleteCf(admin2);
+
+    admin1.enableReplicationPeer(PEER_ID);
+
+    // the source table's cf still exists, replication should be stalled
+    verifyReplicationStuck();
+    deleteCf(admin1);
+    // now the source table's cf is gone, replication should proceed, the
+    // offending edits be dropped
+    verifyReplicationProceeded();
+  }
+
+  private void verifyReplicationProceeded() throws Exception {
+    try (Table table = utility1.getConnection().getTable(TABLE)) {
+      Put put = new Put(ROW);
+      put.addColumn(NORMAL_CF, QUALIFIER, VALUE);
+      table.put(put);
+    }
+    try (Table peerTable = utility2.getConnection().getTable(TABLE)) {
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i == NB_RETRIES - 1) {
+          fail("Waited too much time for put replication");
+        }
+        Result result = peerTable.get(new Get(ROW).addColumn(NORMAL_CF, 
QUALIFIER));
+        if (result == null || result.isEmpty()) {
+          LOG.info("Row not available in peer cluster");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          assertArrayEquals(VALUE, result.getValue(NORMAL_CF, QUALIFIER));
+          break;
+        }
+      }
+    }

Review comment:
       HBaseTestingUtility has some nice `waitFor()` methods which simplify 
this kind of logic

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
##########
@@ -279,28 +296,146 @@ private int getEstimatedEntrySize(Entry e) {
     }
   }
 
-  private TableName parseTable(String msg) {
-    // ... TableNotFoundException: '<table>'/n...
-    Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
-    Matcher m = p.matcher(msg);
-    if (m.find()) {
-      String table = m.group(1);
-      try {
-        // double check that table is a valid table name
-        
TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
-        return TableName.valueOf(table);
-      } catch (IllegalArgumentException ignore) {
+  /**
+   * Check if there's an {@link TableNotFoundException} in the caused by 
stacktrace.
+   */
+  @VisibleForTesting
+  public static boolean isTableNotFoundException(Throwable io) {
+    if (io instanceof RemoteException) {
+      io = ((RemoteException) io).unwrapRemoteException();
+    }
+    if (io != null && io.getMessage().contains("TableNotFoundException")) {
+      return true;
+    }
+    for (; io != null; io = io.getCause()) {
+      if (io instanceof TableNotFoundException) {
+        return true;
       }
     }
-    return null;
+    return false;
   }
 
-  // Filter a set of batches by TableName
-  private List<List<Entry>> filterBatches(final List<List<Entry>> 
oldEntryList, TableName table) {
-    return oldEntryList
-        .stream().map(entries -> entries.stream()
-            .filter(e -> 
!e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
-        .collect(Collectors.toList());
+  /**
+   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by 
stacktrace.
+   */
+  @VisibleForTesting
+  public static boolean isNoSuchColumnFamilyException(Throwable io) {
+    if (io instanceof RemoteException) {
+      io = ((RemoteException) io).unwrapRemoteException();
+    }
+    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) 
{
+      return true;
+    }
+    for (; io != null; io = io.getCause()) {
+      if (io instanceof NoSuchColumnFamilyException) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @VisibleForTesting
+  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> 
oldEntryList) {
+    List<List<Entry>> entryList = new ArrayList<>();
+    Map<TableName, Boolean> existMap = new HashMap<>();
+    try (Connection localConn = 
ConnectionFactory.createConnection(ctx.getLocalConfiguration());
+         Admin localAdmin = localConn.getAdmin()) {
+      for (List<Entry> oldEntries : oldEntryList) {
+        List<Entry> entries = new ArrayList<>();
+        for (Entry e : oldEntries) {
+          TableName tableName = e.getKey().getTableName();
+          boolean exist = true;
+          if (existMap.containsKey(tableName)) {
+            exist = existMap.get(tableName);
+          } else {
+            try {
+              exist = localAdmin.tableExists(tableName);
+              existMap.put(tableName, exist);
+            } catch (IOException iox) {
+              LOG.warn("Exception checking for local table " + tableName, iox);

Review comment:
       Should we assume the table `exist`'s when we can't check for it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to