reddycharan commented on a change in pull request #2206:
URL: https://github.com/apache/bookkeeper/pull/2206#discussion_r438580733
##########
File path:
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
##########
@@ -857,4 +874,205 @@ private void verifyRecoveredLedgers(LedgerHandle lh, long
startEntryId,
}
}
+ class MockZooKeeperClient extends ZooKeeperClient {
+ private final String connectString;
+ private final int sessionTimeoutMs;
+ private final ZooKeeperWatcherBase watcherManager;
+ private volatile String pathOfSetDataToFail;
+ private volatile String pathOfDeleteToFail;
+ private AtomicInteger numOfTimesSetDataFailed = new AtomicInteger();
+ private AtomicInteger numOfTimesDeleteFailed = new AtomicInteger();
+
+ MockZooKeeperClient(String connectString, int sessionTimeoutMs,
ZooKeeperWatcherBase watcher)
+ throws IOException {
+ /*
+ * in OperationalRetryPolicy maxRetries is set to 0. So it wont
+ * retry incase of any error/exception.
+ */
+ super(connectString, sessionTimeoutMs, watcher,
+ new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs,
sessionTimeoutMs, Integer.MAX_VALUE),
+ new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs,
sessionTimeoutMs, 0),
+ NullStatsLogger.INSTANCE, 1, 0);
+ this.connectString = connectString;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.watcherManager = watcher;
+ }
+
+ @Override
+ protected ZooKeeper createZooKeeper() throws IOException {
+ return new MockZooKeeper(this.connectString,
this.sessionTimeoutMs, this.watcherManager, false);
+ }
+
+ private void setPathOfSetDataToFail(String pathOfSetDataToFail) {
+ this.pathOfSetDataToFail = pathOfSetDataToFail;
+ }
+
+ private void setPathOfDeleteToFail(String pathOfDeleteToFail) {
+ this.pathOfDeleteToFail = pathOfDeleteToFail;
+ }
+
+ private int getNumOfTimesSetDataFailed() {
+ return numOfTimesSetDataFailed.get();
+ }
+
+ private int getNumOfTimesDeleteFailed() {
+ return numOfTimesDeleteFailed.get();
+ }
+
+ class MockZooKeeper extends ZooKeeper {
+ public MockZooKeeper(String connectString, int sessionTimeout,
Watcher watcher, boolean canBeReadOnly)
+ throws IOException {
+ super(connectString, sessionTimeout, watcher, canBeReadOnly);
+ }
+
+ @Override
+ public void setData(final String path, final byte[] data, final
int version, final StatCallback cb,
+ final Object context) {
+ if ((pathOfSetDataToFail != null) &&
(pathOfSetDataToFail.equals(path))) {
+ /*
+ * if pathOfSetDataToFail matches with the path of the
node,
+ * then callback with CONNECTIONLOSS error.
+ */
+ LOG.error("setData of MockZooKeeper, is failing with
CONNECTIONLOSS for path: {}", path);
+ numOfTimesSetDataFailed.incrementAndGet();
+
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, context,
null);
+ } else {
+ super.setData(path, data, version, cb, context);
+ }
+ }
+
+ @Override
+ public void delete(final String path, final int version) throws
KeeperException, InterruptedException {
+ if ((pathOfDeleteToFail != null) &&
(pathOfDeleteToFail.equals(path))) {
+ /*
+ * if pathOfDeleteToFail matches with the path of the node,
+ * then throw CONNECTIONLOSS exception.
+ */
+ LOG.error("delete of MockZooKeeper, is failing with
CONNECTIONLOSS for path: {}", path);
+ numOfTimesDeleteFailed.incrementAndGet();
+ throw new KeeperException.ConnectionLossException();
+ } else {
+ super.delete(path, version);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRWShutDownInTheCaseOfZKOperationFailures() throws
Exception {
+ /*
+ * create MockZooKeeperClient instance and wait for it to be connected.
+ */
+ int zkSessionTimeOut = 10000;
+ ZooKeeperWatcherBase zooKeeperWatcherBase = new
ZooKeeperWatcherBase(zkSessionTimeOut,
+ NullStatsLogger.INSTANCE);
+ MockZooKeeperClient zkFaultInjectionWrapper = new
MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
+ zkSessionTimeOut, zooKeeperWatcherBase);
+ zkFaultInjectionWrapper.waitForConnection();
+ assertEquals("zkFaultInjectionWrapper should be in connected state",
States.CONNECTED,
+ zkFaultInjectionWrapper.getState());
+ long oldZkInstanceSessionId = zkFaultInjectionWrapper.getSessionId();
+
+ /*
+ * create ledger and add entries.
+ */
+ BookKeeper bkWithMockZK = new BookKeeper(baseClientConf,
zkFaultInjectionWrapper);
+ long ledgerId = 567L;
+ LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2,
BookKeeper.DigestType.CRC32, TESTPASSWD,
+ null);
+ for (int i = 0; i < 10; i++) {
+ lh.addEntry(i, data);
+ }
+ lh.close();
+
+ /*
+ * trigger Expired event so that MockZooKeeperClient would run
+ * 'clientCreator' and create new zk handle. In this case it would
+ * create MockZooKeeper instance.
+ */
+ zooKeeperWatcherBase.process(new WatchedEvent(EventType.None,
KeeperState.Expired, ""));
+ zkFaultInjectionWrapper.waitForConnection();
+ for (int i = 0; i < 10; i++) {
+ if (zkFaultInjectionWrapper.getState() == States.CONNECTED) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ assertEquals("zkFaultInjectionWrapper should be in connected state",
States.CONNECTED,
+ zkFaultInjectionWrapper.getState());
+ assertNotEquals("Session Id of old and new ZK instance should be
different", oldZkInstanceSessionId,
+ zkFaultInjectionWrapper.getSessionId());
+
+ /*
+ * Kill a Bookie, so that ledger becomes underreplicated. Since totally
+ * 3 bookies are available and the ensemblesize of the current ledger
is
+ * 2, we should be able to replicate to the other bookie.
+ */
+ BookieSocketAddress replicaToKill =
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
+ LOG.info("Killing Bookie", replicaToKill);
+ killBookie(replicaToKill);
+
+ /*
+ * Start RW.
+ */
+ ReplicationWorker rw = new ReplicationWorker(baseConf, bkWithMockZK,
false, NullStatsLogger.INSTANCE);
Review comment:
@eolivelli fixed it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]