This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 4cfbf19 HBASE-25583: NoNodeException of the peer should call the
remove peer workflow (#2970)
4cfbf19 is described below
commit 4cfbf19791c190f6736c1f4ba07b4219f00c84d0
Author: Sandeep Pal <[email protected]>
AuthorDate: Fri Feb 26 16:00:37 2021 -0800
HBASE-25583: NoNodeException of the peer should call the remove peer
workflow (#2970)
Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
---
.../regionserver/ReplicationSource.java | 11 +---
.../regionserver/ReplicationSourceManager.java | 38 ++++++++---
.../ReplicationSourceDummyWithNoTermination.java | 7 +-
.../hbase/replication/TestReplicationSource.java | 46 +++++++++----
....java => TestReplicationSourceManagerBase.java} | 12 ++--
...va => TestReplicationSourceManagerManager.java} | 77 +---------------------
...tReplicationSourceWithoutReplicationZnodes.java | 67 +++++++++++--------
7 files changed, 120 insertions(+), 138 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 969e8ca..de3b7f6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import
org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@@ -784,13 +783,9 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
- try {
- manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode,
lastReadPosition,
- this.replicationQueueInfo.isQueueRecovered(), false);
- lastLoggedPosition = lastReadPosition;
- } catch (ReplicationSourceWithoutPeerException re) {
- source.terminate("Replication peer is removed and source should
terminate", re);
- }
+ manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode,
lastReadPosition,
+ this.replicationQueueInfo.isQueueRecovered(), false);
+ lastLoggedPosition = lastReadPosition;
}
public void startup() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8e8e76..b0e32f8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -189,12 +189,13 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id,
long position,
- boolean queueRecovered, boolean holdLogInZK) throws
ReplicationSourceWithoutPeerException {
+ boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
+
cleanOldLogs(fileName, id, queueRecovered);
}
@@ -205,8 +206,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
- public void cleanOldLogs(String key, String id, boolean queueRecovered)
- throws ReplicationSourceWithoutPeerException {
+ public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
Map<String, SortedSet<String>> walsForPeer =
walsByIdRecoveredQueues.get(id);
@@ -218,7 +218,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
} else {
synchronized (this.walsById) {
- SortedSet<String> wals = walsById.get(id).get(logPrefix);
+ SortedSet<String> wals = getLogsWithPrefix(id, logPrefix);
if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
@@ -226,17 +226,37 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
}
- private void cleanOldLogs(SortedSet<String> wals, String key, String id)
- throws ReplicationSourceWithoutPeerException {
+ private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
- for (String wal : walSet) {
- this.replicationQueues.removeLog(id, wal);
+ try {
+ for (String wal : walSet) {
+ this.replicationQueues.removeLog(id, wal);
+ }
+ } catch (ReplicationSourceWithoutPeerException rspe) {
+ // This means the source is running and replication peer have been
removed
+ // We should call the removePeer workflow to terminate the source
gracefully
+ LOG.warn("Replication peer " + id + " has been removed and source is
still running", rspe);
+ String peerId = id;
+ if (peerId.contains("-")) {
+ peerId = peerId.split("-")[0];
+ }
+ peerRemoved(peerId);
}
walSet.clear();
}
/**
+ * Get logs with log prefix for the given wal group
+ * @param walGroupId wal group ID
+ * @param logPrefix log prefix
+ * @return logs with the given prefix
+ */
+ public SortedSet<String> getLogsWithPrefix(String walGroupId, String
logPrefix) {
+ return walsById.get(walGroupId).get(logPrefix);
+ }
+
+ /**
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
*/
@@ -579,7 +599,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
/**
- * Thie method first deletes all the recovered sources for the specified
+ * This method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
* @param id The id of the peer cluster
*/
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
index 4a89917..1fba87f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
@@ -16,11 +16,14 @@
package org.apache.hadoop.hbase.replication;
public class ReplicationSourceDummyWithNoTermination extends
ReplicationSourceDummy {
-
+ volatile boolean firstTime = true;
@Override
public void terminate(String reason) {
// This is to block the zk listener to close the queues
// to simulate the znodes getting deleted without zk listener getting
invoked
- throw new RuntimeException(fakeExceptionMessage);
+ if (firstTime) {
+ firstTime = false;
+ throw new RuntimeException(fakeExceptionMessage);
+ }
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index b0a2a8c..e7ff58f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -28,20 +28,22 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-
import com.google.common.collect.Lists;
-
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.SortedSet;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
@@ -247,7 +250,6 @@ public class TestReplicationSource {
public boolean evaluate() throws Exception {
return future.isDone();
}
-
});
}
@@ -277,7 +279,7 @@ public class TestReplicationSource {
}
private static final class Mocks {
- private final ReplicationSourceManager manager =
mock(ReplicationSourceManager.class);
+ private ReplicationSourceManager manager =
mock(ReplicationSourceManager.class);
private final ReplicationQueues queues = mock(ReplicationQueues.class);
private final ReplicationPeers peers = mock(ReplicationPeers.class);
private final MetricsSource metrics = mock(MetricsSource.class);
@@ -291,12 +293,32 @@ public class TestReplicationSource {
when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
}
- // source manager throws the exception while cleaning logs
- private void setReplicationSourceWithoutPeerException()
- throws ReplicationSourceWithoutPeerException {
- doThrow(new ReplicationSourceWithoutPeerException("No
peer")).when(manager)
+ ReplicationSource
createReplicationSourceAndManagerWithMocks(ReplicationEndpoint endpoint)
+ throws Exception {
+ ReplicationTracker tracker = mock(ReplicationTracker.class);
+ Server server = mock(Server.class);
+ FileSystem fs = mock(FileSystem.class);
+ UUID clusterId = UUID.randomUUID();
+ String peerId = "testPeerClusterZnode";
+
+ manager = Mockito.spy(new ReplicationSourceManager(
+ queues, peers, tracker, conf, server, fs, logDir, oldLogDir,
clusterId));
+
+ doCallRealMethod().when(manager).removePeer(Mockito.anyString());
+ // Mock the failure during cleaning log with node already deleted
+ doThrow(new ReplicationSourceWithoutPeerException("Peer
Removed")).when(queues)
+ .removeLog(anyString(), anyString());
+ doCallRealMethod().when(manager)
.logPositionAndCleanOldLogs(Mockito.<Path>anyObject(),
Mockito.anyString(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ final ReplicationSource source = new ReplicationSource();
+ endpoint.init(context);
+ source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
+ peerId, clusterId, endpoint, metrics);
+ manager.getSources().add(source);
+ SortedSet<String> walsWithPrefix =
Sets.newTreeSet(Collections.singletonList("fake"));
+ doReturn(walsWithPrefix).when(manager).getLogsWithPrefix(anyString(),
anyString());
+ return source;
}
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint
endpoint,
@@ -522,8 +544,7 @@ public class TestReplicationSource {
*/
@Test
public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues()
throws Exception {
- Mocks mocks = new Mocks();
- mocks.setReplicationSourceWithoutPeerException();
+ final Mocks mocks = new Mocks();
// set table cfs to filter all cells out
final TableName replicatedTable = TableName.valueOf("replicated_table");
final Map<TableName, List<String>> cfs =
@@ -543,7 +564,7 @@ public class TestReplicationSource {
}
};
- final ReplicationSource source =
mocks.createReplicationSourceWithMocks(endpoint, false);
+ final ReplicationSource source =
mocks.createReplicationSourceAndManagerWithMocks(endpoint);
source.run();
source.enqueueLog(log1);
@@ -561,10 +582,9 @@ public class TestReplicationSource {
}
});
- // After that the source should be terminated
+ // And the source should be terminated
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
- // wait until reader read all cells
return !source.isSourceActive();
}
});
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
similarity index 95%
rename from
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
rename to
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
index ab4d19d..ec2facd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerBase.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import
org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,10 +51,10 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
-public abstract class TestReplicationSourceBase {
+public abstract class TestReplicationSourceManagerBase {
private static final Log LOG =
- LogFactory.getLog(TestReplicationSourceBase.class);
+ LogFactory.getLog(TestReplicationSourceManagerBase.class);
protected static Configuration conf;
protected static HBaseTestingUtility utility;
@@ -75,10 +75,12 @@ public abstract class TestReplicationSourceBase {
protected static Path logDir;
protected static DummyServer server;
- @BeforeClass public static void setUpBeforeClass() throws Exception {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
- ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+ ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
similarity index 88%
rename from
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
rename to
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
index f0c18d3..50c96cf 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
@@ -47,17 +46,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
@@ -72,91 +67,25 @@ import
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
-public class TestReplicationSourceManager extends TestReplicationSourceBase {
-
+public class TestReplicationSourceManagerManager extends
TestReplicationSourceManagerBase {
private static final Log LOG =
- LogFactory.getLog(TestReplicationSourceManager.class);
- private static final TableName test =
- TableName.valueOf("test");
- private static final String slaveId = "1";
- private static CountDownLatch latch;
+ LogFactory.getLog(TestReplicationSourceManagerManager.class);
private static List<String> files = new ArrayList<>();
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
-
- conf = HBaseConfiguration.create();
- conf.set("replication.replicationsource.implementation",
- ReplicationSourceDummy.class.getCanonicalName());
- conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
- HConstants.REPLICATION_ENABLE_DEFAULT);
- conf.setLong("replication.sleep.before.failover", 2000);
- conf.setInt("replication.source.maxretriesmultiplier", 10);
- utility = new HBaseTestingUtility(conf);
- utility.startMiniZKCluster();
-
- zkw = new ZooKeeperWatcher(conf, "test", null);
- ZKUtil.createWithParents(zkw, "/hbase/replication");
- ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
- ZKUtil.setData(zkw, "/hbase/replication/peers/1",
- Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
- + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
- ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
- ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
- ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
- ZKUtil.createWithParents(zkw, "/hbase/replication/state");
- ZKUtil.setData(zkw, "/hbase/replication/state",
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-
- ZKClusterId.setClusterId(zkw, new ClusterId());
- FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
- fs = FileSystem.get(conf);
- oldLogDir = new Path(utility.getDataTestDir(),
- HConstants.HREGION_OLDLOGDIR_NAME);
- logDir = new Path(utility.getDataTestDir(),
- HConstants.HREGION_LOGDIR_NAME);
- server = new DummyServer(conf, "example.hostname.com", zkw);
- replication = new Replication(server, fs, logDir, oldLogDir);
- manager = replication.getReplicationManager();
-
- manager.addSource(slaveId);
-
- htd = new HTableDescriptor(test);
- HColumnDescriptor col = new HColumnDescriptor(f1);
- col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- htd.addFamily(col);
- col = new HColumnDescriptor(f2);
- col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
- htd.addFamily(col);
-
- hri = new HRegionInfo(htd.getTableName(), r1, r2);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- manager.join();
- utility.shutdownMiniCluster();
- }
+ private static CountDownLatch latch;
@Test
public void testLogRoll() throws Exception {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
index 095710d..c823548 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
@@ -21,25 +21,30 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import
org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import
org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
-public class TestReplicationSourceWithoutReplicationZnodes extends
TestReplicationSourceBase {
+public class TestReplicationSourceWithoutReplicationZnodes
+ extends TestReplicationSourceManagerBase {
+
+ @Before
+ public void removeExistingSourcesFromSourceManager() {
+ manager.getSources().clear();
+ manager.getOldSources().clear();
+ }
/**
* When the peer is removed, hbase remove the peer znodes and there is zk
watcher
@@ -47,39 +52,47 @@ public class TestReplicationSourceWithoutReplicationZnodes
extends TestReplicati
* or a race condition between source deleting the log znode and zk watcher
* terminating the source, we might get the NoNode exception. In that case,
the right
* thing is to terminate the replication source.
+ *
* @throws Exception throws exception
*/
@Test
public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception
{
+ String replicationSourceImplName =
conf.get("replication.replicationsource.implementation");
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
+ try {
+ conf.set("replication.replicationsource.implementation",
+ ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+ List<WALActionsListener> listeners = new ArrayList<>();
+ listeners.add(replication);
+ final WALFactory wals = new WALFactory(utility.getConfiguration(),
listeners,
+ URLEncoder.encode("regionserver:60020", "UTF8"));
+ final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(),
hri.getTable().getNamespace());
+ manager.init();
- List<WALActionsListener> listeners = new ArrayList<>();
- listeners.add(replication);
- final WALFactory wals = new WALFactory(utility.getConfiguration(),
listeners,
- URLEncoder.encode("regionserver:60020", "UTF8"));
- final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(),
hri.getTable().getNamespace());
- manager.init();
-
- final long txid = wal.append(htd, hri,
- new WALKey(hri.getEncodedNameAsBytes(), test,
System.currentTimeMillis(), mvcc),
- edit, true);
- wal.sync(txid);
+ final long txid = wal.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test,
System.currentTimeMillis(), mvcc), edit,
+ true);
+ wal.sync(txid);
- wal.rollWriter();
- ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
- ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+
server.getServerName() + "/1");
+ wal.rollWriter();
+ ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+ ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" +
server.getServerName() + "/1");
- ReplicationException exceptionThrown = null;
- try {
-
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false, false);
- } catch (ReplicationException e) {
- exceptionThrown = e;
+ Assert.assertEquals("There should be exactly one source",
+ 1, manager.getSources().size());
+ Assert.assertEquals("Replication source is not correct",
+ ReplicationSourceDummyWithNoTermination.class,
+ manager.getSources().get(0).getClass());
+ manager
+
.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1",
0, false,
+ false);
+ Assert.assertTrue("Replication source should be terminated and removed",
+ manager.getSources().isEmpty());
+ } finally {
+ conf.set("replication.replicationsource.implementation",
replicationSourceImplName);
}
-
- Assert.assertTrue(exceptionThrown instanceof
ReplicationSourceWithoutPeerException);
}
}