This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 4cfbf19  HBASE-25583: NoNodeException of the peer should call the 
remove peer workflow (#2970)
4cfbf19 is described below

commit 4cfbf19791c190f6736c1f4ba07b4219f00c84d0
Author: Sandeep Pal <[email protected]>
AuthorDate: Fri Feb 26 16:00:37 2021 -0800

    HBASE-25583: NoNodeException of the peer should call the remove peer 
workflow (#2970)
    
    Signed-off-by: Bharath Vissapragada <[email protected]>
    Signed-off-by: Andrew Purtell <[email protected]>
---
 .../regionserver/ReplicationSource.java            | 11 +---
 .../regionserver/ReplicationSourceManager.java     | 38 ++++++++---
 .../ReplicationSourceDummyWithNoTermination.java   |  7 +-
 .../hbase/replication/TestReplicationSource.java   | 46 +++++++++----
 ....java => TestReplicationSourceManagerBase.java} | 12 ++--
 ...va => TestReplicationSourceManagerManager.java} | 77 +---------------------
 ...tReplicationSourceWithoutReplicationZnodes.java | 67 +++++++++++--------
 7 files changed, 120 insertions(+), 138 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 969e8ca..de3b7f6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import 
org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@@ -784,13 +783,9 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      try {
-        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, 
lastReadPosition,
-          this.replicationQueueInfo.isQueueRecovered(), false);
-        lastLoggedPosition = lastReadPosition;
-      } catch (ReplicationSourceWithoutPeerException re) {
-        source.terminate("Replication peer is removed and source should 
terminate", re);
-      }
+      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, 
lastReadPosition,
+        this.replicationQueueInfo.isQueueRecovered(), false);
+      lastLoggedPosition = lastReadPosition;
     }
 
     public void startup() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8e8e76..b0e32f8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -189,12 +189,13 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
-      boolean queueRecovered, boolean holdLogInZK) throws 
ReplicationSourceWithoutPeerException {
+      boolean queueRecovered, boolean holdLogInZK) {
     String fileName = log.getName();
     this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
       return;
     }
+
     cleanOldLogs(fileName, id, queueRecovered);
   }
 
@@ -205,8 +206,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param id id of the peer cluster
    * @param queueRecovered Whether this is a recovered queue
    */
-  public void cleanOldLogs(String key, String id, boolean queueRecovered)
-      throws ReplicationSourceWithoutPeerException {
+  public void cleanOldLogs(String key, String id, boolean queueRecovered) {
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
     if (queueRecovered) {
       Map<String, SortedSet<String>> walsForPeer = 
walsByIdRecoveredQueues.get(id);
@@ -218,7 +218,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       }
     } else {
       synchronized (this.walsById) {
-        SortedSet<String> wals = walsById.get(id).get(logPrefix);
+        SortedSet<String> wals = getLogsWithPrefix(id, logPrefix);
         if (wals != null && !wals.first().equals(key)) {
           cleanOldLogs(wals, key, id);
         }
@@ -226,17 +226,37 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
-  private void cleanOldLogs(SortedSet<String> wals, String key, String id)
-      throws ReplicationSourceWithoutPeerException {
+  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
     SortedSet<String> walSet = wals.headSet(key);
     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
-    for (String wal : walSet) {
-      this.replicationQueues.removeLog(id, wal);
+    try {
+      for (String wal : walSet) {
+        this.replicationQueues.removeLog(id, wal);
+      }
+    } catch (ReplicationSourceWithoutPeerException rspe) {
+      // This means the source is running and replication peer have been 
removed
+      // We should call the removePeer workflow to terminate the source 
gracefully
+      LOG.warn("Replication peer " + id + " has been removed and source is 
still running", rspe);
+      String peerId = id;
+      if (peerId.contains("-")) {
+        peerId = peerId.split("-")[0];
+      }
+      peerRemoved(peerId);
     }
     walSet.clear();
   }
 
   /**
+   * Get logs with log prefix for the given wal group
+   * @param walGroupId wal group ID
+   * @param logPrefix log prefix
+   * @return logs with the given prefix
+   */
+  public SortedSet<String> getLogsWithPrefix(String walGroupId, String 
logPrefix) {
+    return walsById.get(walGroupId).get(logPrefix);
+  }
+
+  /**
    * Adds a normal source per registered peer cluster and tries to process all
    * old region server wal queues
    */
@@ -579,7 +599,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Thie method first deletes all the recovered sources for the specified
+   * This method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster
    */
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
index 4a89917..1fba87f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
@@ -16,11 +16,14 @@
 package org.apache.hadoop.hbase.replication;
 
 public class ReplicationSourceDummyWithNoTermination extends 
ReplicationSourceDummy {
-
+  volatile boolean firstTime = true;
   @Override
   public void terminate(String reason) {
     // This is to block the zk listener to close the queues
     // to simulate the znodes getting deleted without zk listener getting 
invoked
-    throw new RuntimeException(fakeExceptionMessage);
+    if (firstTime) {
+      firstTime = false;
+      throw new RuntimeException(fakeExceptionMessage);
+    }
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index b0a2a8c..e7ff58f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -28,20 +28,22 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
-
 import com.google.common.collect.Lists;
-
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
@@ -247,7 +250,6 @@ public class TestReplicationSource {
       public boolean evaluate() throws Exception {
         return future.isDone();
       }
-
     });
   }
 
@@ -277,7 +279,7 @@ public class TestReplicationSource {
   }
 
   private static final class Mocks {
-    private final ReplicationSourceManager manager = 
mock(ReplicationSourceManager.class);
+    private ReplicationSourceManager manager = 
mock(ReplicationSourceManager.class);
     private final ReplicationQueues queues = mock(ReplicationQueues.class);
     private final ReplicationPeers peers = mock(ReplicationPeers.class);
     private final MetricsSource metrics = mock(MetricsSource.class);
@@ -291,12 +293,32 @@ public class TestReplicationSource {
       when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
     }
 
-    // source manager throws the exception while cleaning logs
-    private void setReplicationSourceWithoutPeerException()
-      throws ReplicationSourceWithoutPeerException {
-      doThrow(new ReplicationSourceWithoutPeerException("No 
peer")).when(manager)
+    ReplicationSource 
createReplicationSourceAndManagerWithMocks(ReplicationEndpoint endpoint)
+        throws Exception {
+      ReplicationTracker tracker = mock(ReplicationTracker.class);
+      Server server = mock(Server.class);
+      FileSystem fs = mock(FileSystem.class);
+      UUID clusterId = UUID.randomUUID();
+      String peerId = "testPeerClusterZnode";
+
+      manager = Mockito.spy(new ReplicationSourceManager(
+        queues, peers, tracker, conf, server, fs, logDir, oldLogDir, 
clusterId));
+
+      doCallRealMethod().when(manager).removePeer(Mockito.anyString());
+      // Mock the failure during cleaning log with node already deleted
+      doThrow(new ReplicationSourceWithoutPeerException("Peer 
Removed")).when(queues)
+        .removeLog(anyString(), anyString());
+      doCallRealMethod().when(manager)
         .logPositionAndCleanOldLogs(Mockito.<Path>anyObject(), 
Mockito.anyString(),
           Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
+      final ReplicationSource source = new ReplicationSource();
+      endpoint.init(context);
+      source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
+        peerId, clusterId, endpoint, metrics);
+      manager.getSources().add(source);
+      SortedSet<String> walsWithPrefix = 
Sets.newTreeSet(Collections.singletonList("fake"));
+      doReturn(walsWithPrefix).when(manager).getLogsWithPrefix(anyString(), 
anyString());
+      return source;
     }
 
     ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint 
endpoint,
@@ -522,8 +544,7 @@ public class TestReplicationSource {
    */
   @Test
   public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() 
throws Exception {
-    Mocks mocks = new Mocks();
-    mocks.setReplicationSourceWithoutPeerException();
+    final Mocks mocks = new Mocks();
     // set table cfs to filter all cells out
     final TableName replicatedTable = TableName.valueOf("replicated_table");
     final Map<TableName, List<String>> cfs =
@@ -543,7 +564,7 @@ public class TestReplicationSource {
       }
     };
 
-    final ReplicationSource source = 
mocks.createReplicationSourceWithMocks(endpoint, false);
+    final ReplicationSource source = 
mocks.createReplicationSourceAndManagerWithMocks(endpoint);
     source.run();
     source.enqueueLog(log1);
 
@@ -561,10 +582,9 @@ public class TestReplicationSource {
       }
     });
 
-    // After that the source should be terminated
+    // And the source should be terminated
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
       @Override public boolean evaluate() {
-        // wait until reader read all cells
         return !source.isSourceActive();
       }
     });
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
similarity index 95%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
index ab4d19d..ec2facd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import 
org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -51,10 +51,10 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 
-public abstract class TestReplicationSourceBase {
+public abstract class TestReplicationSourceManagerBase {
 
   private static final Log LOG =
-    LogFactory.getLog(TestReplicationSourceBase.class);
+    LogFactory.getLog(TestReplicationSourceManagerBase.class);
 
   protected static Configuration conf;
   protected static HBaseTestingUtility utility;
@@ -75,10 +75,12 @@ public abstract class TestReplicationSourceBase {
   protected static Path logDir;
   protected static DummyServer server;
 
-  @BeforeClass public static void setUpBeforeClass() throws Exception {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
     conf = HBaseConfiguration.create();
     conf.set("replication.replicationsource.implementation",
-      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+      ReplicationSourceDummy.class.getCanonicalName());
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, 
HConstants.REPLICATION_ENABLE_DEFAULT);
     conf.setLong("replication.sleep.before.failover", 2000);
     conf.setInt("replication.source.maxretriesmultiplier", 10);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
similarity index 88%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
index f0c18d3..50c96cf 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URLEncoder;
@@ -47,17 +46,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
@@ -72,91 +67,25 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(MediumTests.class)
-public class TestReplicationSourceManager extends TestReplicationSourceBase {
-
+public class TestReplicationSourceManagerManager extends 
TestReplicationSourceManagerBase {
   private static final Log LOG =
-    LogFactory.getLog(TestReplicationSourceManager.class);
-  private static final TableName test =
-      TableName.valueOf("test");
-  private static final String slaveId = "1";
-  private static CountDownLatch latch;
+    LogFactory.getLog(TestReplicationSourceManagerManager.class);
   private static List<String> files = new ArrayList<>();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-
-    conf = HBaseConfiguration.create();
-    conf.set("replication.replicationsource.implementation",
-        ReplicationSourceDummy.class.getCanonicalName());
-    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
-        HConstants.REPLICATION_ENABLE_DEFAULT);
-    conf.setLong("replication.sleep.before.failover", 2000);
-    conf.setInt("replication.source.maxretriesmultiplier", 10);
-    utility = new HBaseTestingUtility(conf);
-    utility.startMiniZKCluster();
-
-    zkw = new ZooKeeperWatcher(conf, "test", null);
-    ZKUtil.createWithParents(zkw, "/hbase/replication");
-    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
-    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
-        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
-            + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
-    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
-    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
-      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
-    ZKUtil.setData(zkw, "/hbase/replication/state", 
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-
-    ZKClusterId.setClusterId(zkw, new ClusterId());
-    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
-    fs = FileSystem.get(conf);
-    oldLogDir = new Path(utility.getDataTestDir(),
-        HConstants.HREGION_OLDLOGDIR_NAME);
-    logDir = new Path(utility.getDataTestDir(),
-        HConstants.HREGION_LOGDIR_NAME);
-    server = new DummyServer(conf, "example.hostname.com", zkw);
-    replication = new Replication(server, fs, logDir, oldLogDir);
-    manager = replication.getReplicationManager();
-
-    manager.addSource(slaveId);
-
-    htd = new HTableDescriptor(test);
-    HColumnDescriptor col = new HColumnDescriptor(f1);
-    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-    htd.addFamily(col);
-    col = new HColumnDescriptor(f2);
-    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
-    htd.addFamily(col);
-
-    hri = new HRegionInfo(htd.getTableName(), r1, r2);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    manager.join();
-    utility.shutdownMiniCluster();
-  }
+  private static CountDownLatch latch;
 
   @Test
   public void testLogRoll() throws Exception {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
index 095710d..c823548 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
@@ -21,25 +21,30 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import 
org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import 
org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(MediumTests.class)
-public class TestReplicationSourceWithoutReplicationZnodes extends 
TestReplicationSourceBase {
+public class TestReplicationSourceWithoutReplicationZnodes
+  extends TestReplicationSourceManagerBase {
+
+  @Before
+  public void removeExistingSourcesFromSourceManager() {
+    manager.getSources().clear();
+    manager.getOldSources().clear();
+  }
 
   /**
    * When the peer is removed, hbase remove the peer znodes and there is zk 
watcher
@@ -47,39 +52,47 @@ public class TestReplicationSourceWithoutReplicationZnodes 
extends TestReplicati
    * or a race condition between source deleting the log znode and zk watcher
    * terminating the source, we might get the NoNode exception. In that case, 
the right
    * thing is to terminate the replication source.
+   *
    * @throws Exception throws exception
    */
   @Test
   public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception 
{
+    String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     KeyValue kv = new KeyValue(r1, f1, r1);
     WALEdit edit = new WALEdit();
     edit.add(kv);
+    try {
+      conf.set("replication.replicationsource.implementation",
+        ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+      List<WALActionsListener> listeners = new ArrayList<>();
+      listeners.add(replication);
+      final WALFactory wals = new WALFactory(utility.getConfiguration(), 
listeners,
+        URLEncoder.encode("regionserver:60020", "UTF8"));
+      final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), 
hri.getTable().getNamespace());
+      manager.init();
 
-    List<WALActionsListener> listeners = new ArrayList<>();
-    listeners.add(replication);
-    final WALFactory wals = new WALFactory(utility.getConfiguration(), 
listeners,
-      URLEncoder.encode("regionserver:60020", "UTF8"));
-    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), 
hri.getTable().getNamespace());
-    manager.init();
-
-    final long txid = wal.append(htd, hri,
-      new WALKey(hri.getEncodedNameAsBytes(), test, 
System.currentTimeMillis(), mvcc),
-      edit, true);
-    wal.sync(txid);
+      final long txid = wal.append(htd, hri,
+        new WALKey(hri.getEncodedNameAsBytes(), test, 
System.currentTimeMillis(), mvcc), edit,
+        true);
+      wal.sync(txid);
 
-    wal.rollWriter();
-    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
-    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ 
server.getServerName() + "/1");
+      wal.rollWriter();
+      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + 
server.getServerName() + "/1");
 
-    ReplicationException exceptionThrown = null;
-    try {
-      
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
-        "1", 0, false, false);
-    } catch (ReplicationException e) {
-      exceptionThrown = e;
+      Assert.assertEquals("There should be exactly one source",
+        1, manager.getSources().size());
+      Assert.assertEquals("Replication source is not correct",
+        ReplicationSourceDummyWithNoTermination.class,
+        manager.getSources().get(0).getClass());
+      manager
+        
.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 
0, false,
+          false);
+      Assert.assertTrue("Replication source should be terminated and removed",
+        manager.getSources().isEmpty());
+    } finally {
+      conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
     }
-
-    Assert.assertTrue(exceptionThrown instanceof 
ReplicationSourceWithoutPeerException);
   }
 }

Reply via email to