This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 0c3600d6c5103a9b90f2939bf3f65a69314225c8 Author: Andrey Yegorov <[email protected]> AuthorDate: Tue Jul 12 07:57:19 2022 -0700 Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374) (cherry picked from commit c3706e9c2508ba9042afc0e3c19a92c30bc2b32d) --- .../org/apache/bookkeeper/bookie/BookieShell.java | 3 +- .../apache/bookkeeper/client/BookKeeperAdmin.java | 2 +- .../bookkeeper/meta/FlatLedgerManagerFactory.java | 4 +- .../bookkeeper/meta/LedgerManagerFactory.java | 3 +- .../LegacyHierarchicalLedgerManagerFactory.java | 4 +- .../bookkeeper/meta/MSLedgerManagerFactory.java | 3 +- .../meta/ZkLedgerUnderreplicationManager.java | 68 +++++++++--------- .../org/apache/bookkeeper/replication/Auditor.java | 50 +++++++++++-- .../replication/ReplicationException.java | 25 +++++++ .../bookkeeper/replication/ReplicationWorker.java | 14 ++-- .../autorecovery/ListUnderReplicatedCommand.java | 3 +- .../QueryAutoRecoveryStatusCommand.java | 3 +- .../cli/commands/autorecovery/ToggleCommand.java | 3 +- .../bookie/ForceAuditorChecksCmdTest.java | 5 +- .../replication/AuditorPeriodicCheckTest.java | 3 +- .../replication/TestReplicationWorker.java | 82 ++++++++++++++++++---- .../metadata/etcd/EtcdLedgerManagerFactory.java | 3 +- .../ListUnderReplicatedCommandTest.java | 8 +-- 18 files changed, 201 insertions(+), 85 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index edf9194f16..0eeb0dc601 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -105,7 +105,6 @@ import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1896,7 +1895,7 @@ public class BookieShell implements Tool { underreplicationManager.setReplicasCheckCTime(time); } } - } catch (InterruptedException | KeeperException | ReplicationException e) { + } catch (InterruptedException | ReplicationException e) { LOG.error("Exception while trying to reset last run time ", e); return -1; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 54cb566a30..279e745101 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -1471,7 +1471,7 @@ public class BookKeeperAdmin implements AutoCloseable { } private LedgerUnderreplicationManager getUnderreplicationManager() - throws CompatibilityException, KeeperException, InterruptedException { + throws CompatibilityException, UnavailableException, InterruptedException { if (underreplicationManager == null) { underreplicationManager = mFactory.newLedgerUnderreplicationManager(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java index e613082d64..95269d3c58 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java @@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; /** @@ -86,7 +85,8 @@ public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory { @Override public LedgerUnderreplicationManager newLedgerUnderreplicationManager() - throws KeeperException, InterruptedException, ReplicationException.CompatibilityException { + throws ReplicationException.UnavailableException, InterruptedException, + ReplicationException.CompatibilityException { return new ZkLedgerUnderreplicationManager(conf, zk); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java index d213235d6e..3d2355f87a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java @@ -84,7 +84,8 @@ public interface LedgerManagerFactory extends AutoCloseable { * @see LedgerUnderreplicationManager */ LedgerUnderreplicationManager newLedgerUnderreplicationManager() - throws KeeperException, InterruptedException, ReplicationException.CompatibilityException; + throws ReplicationException.UnavailableException, + InterruptedException, ReplicationException.CompatibilityException; /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java index a218ef3eb9..03f8828134 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java @@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; /** @@ -93,7 +92,8 @@ public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerMana @Override public LedgerUnderreplicationManager newLedgerUnderreplicationManager() - throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{ + throws ReplicationException.UnavailableException, InterruptedException, + ReplicationException.CompatibilityException{ return new ZkLedgerUnderreplicationManager(conf, zk); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java index 3ad303aa28..e8dfdb08f3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java @@ -671,7 +671,8 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory { } @Override - public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException, + public LedgerUnderreplicationManager newLedgerUnderreplicationManager() + throws ReplicationException.UnavailableException, InterruptedException, ReplicationException.CompatibilityException { // TODO: currently just use zk ledger underreplication manager return new ZkLedgerUnderreplicationManager(conf, zk); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index c92d2d90d8..3ac346b1cc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -128,7 +128,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa private final SubTreeCache subTreeCache; public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc) - throws KeeperException, InterruptedException, ReplicationException.CompatibilityException { + throws UnavailableException, InterruptedException, ReplicationException.CompatibilityException { this.conf = conf; rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf); basePath = getBasePath(rootPath); @@ -149,7 +149,11 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } }); - checkLayout(); + try { + checkLayout(); + } catch (KeeperException ke) { + throw ReplicationException.fromKeeperException("", ke); + } } public static String getBasePath(String rootPath) { @@ -284,7 +288,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa underreplicatedLedger.setReplicaList(replicaList); return underreplicatedLedger; } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); @@ -424,7 +428,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // znode in place, so the ledger is checked. } catch (KeeperException ke) { LOG.error("Error deleting underreplicated ledger znode", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -578,7 +582,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa try { return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0); } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); @@ -608,7 +612,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // nothing found, wait for a watcher to trigger changedLatch.await(); } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); @@ -639,7 +643,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // this is ok } catch (KeeperException ke) { LOG.error("Error deleting underreplicated ledger lock", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); @@ -660,7 +664,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // this is ok } catch (KeeperException ke) { LOG.error("Error deleting underreplicated ledger lock", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie); @@ -684,8 +688,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa "AutoRecovery is already disabled!", ke); } catch (KeeperException ke) { LOG.error("Exception while stopping auto ledger re-replication", ke); - throw new ReplicationException.UnavailableException( - "Exception while stopping auto ledger re-replication", ke); + throw ReplicationException.fromKeeperException("Exception while stopping auto ledger re-replication", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException( @@ -708,8 +711,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa "AutoRecovery is already enabled!", ke); } catch (KeeperException ke) { LOG.error("Exception while resuming ledger replication", ke); - throw new ReplicationException.UnavailableException( - "Exception while resuming auto ledger re-replication", ke); + throw ReplicationException.fromKeeperException("Exception while resuming auto ledger re-replication", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException( @@ -729,8 +731,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } catch (KeeperException ke) { LOG.error("Error while checking the state of " + "ledger re-replication", ke); - throw new ReplicationException.UnavailableException( - "Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException( @@ -765,8 +766,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } catch (KeeperException ke) { LOG.error("Error while checking the state of " + "ledger re-replication", ke); - throw new ReplicationException.UnavailableException( - "Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException( @@ -791,10 +791,14 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa */ public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId, List<ACL> zkAcls) - throws KeeperException, InterruptedException { - final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId); - ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL); - return lockPath; + throws UnavailableException, InterruptedException { + try { + final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId); + ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL); + return lockPath; + } catch (KeeperException ke) { + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); + } } @Override @@ -824,7 +828,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa return false; } catch (KeeperException ke) { LOG.error("Error while initializing LostBookieRecoveryDelay", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -845,7 +849,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } } catch (KeeperException ke) { LOG.error("Error while setting LostBookieRecoveryDelay ", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -860,7 +864,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa return Integer.parseInt(new String(data, UTF_8)); } catch (KeeperException ke) { LOG.error("Error while getting LostBookieRecoveryDelay ", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -882,7 +886,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE); } catch (KeeperException ke) { LOG.error("Error while checking the state of underReplicated ledgers", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -907,7 +911,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa } } catch (KeeperException ke) { LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke); - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -928,7 +932,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa // this is ok. } catch (KeeperException e) { LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e); - throw new ReplicationException.UnavailableException( + throw ReplicationException.fromKeeperException( "Error while getting ReplicationWorkerId rereplicating Ledger", e); } catch (InterruptedException e) { LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e); @@ -957,7 +961,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa zkc.create(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT); } } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -978,7 +982,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa LOG.warn("checkAllLedgersCtimeZnode is not yet available"); return -1; } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -1004,7 +1008,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa CreateMode.PERSISTENT); } } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -1025,7 +1029,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa LOG.warn("placementPolicyCheckCtimeZnode is not yet available"); return -1; } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -1050,7 +1054,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa LOG.debug("setReplicasCheckCTime completed successfully"); } } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); @@ -1070,7 +1074,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa LOG.warn("replicasCheckCtimeZnode is not yet available"); return -1; } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 8404e97e21..428f763ea7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -116,7 +116,6 @@ import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.collections4.CollectionUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -527,9 +526,6 @@ public class Auditor implements AutoCloseable { } catch (CompatibilityException ce) { throw new UnavailableException( "CompatibilityException while initializing Auditor", ce); - } catch (KeeperException ioe) { - throw new UnavailableException( - "Exception while initializing Auditor", ioe); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new UnavailableException( @@ -694,6 +690,9 @@ public class Auditor implements AutoCloseable { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.error("Interrupted while for LedgersReplication to be enabled ", ie); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (UnavailableException ue) { LOG.error("Exception while reading from ZK", ue); } finally { @@ -771,6 +770,10 @@ public class Auditor implements AutoCloseable { long initialDelay; try { checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime(); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return; } catch (UnavailableException ue) { LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue); checkAllLedgersLastExecutedCTime = -1; @@ -810,8 +813,6 @@ public class Auditor implements AutoCloseable { LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration); checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS); checkSuccess = true; - } catch (KeeperException ke) { - LOG.error("Exception while running periodic check", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.error("Interrupted while running periodic check", ie); @@ -819,6 +820,9 @@ public class Auditor implements AutoCloseable { LOG.error("Exception running periodic check", bke); } catch (IOException ioe) { LOG.error("I/O exception running periodic check", ioe); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (ReplicationException.UnavailableException ue) { LOG.error("Underreplication manager unavailable running periodic check", ue); } finally { @@ -846,6 +850,10 @@ public class Auditor implements AutoCloseable { long initialDelay; try { placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime(); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return; } catch (UnavailableException ue) { LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue); placementPolicyCheckLastExecutedCTime = -1; @@ -969,6 +977,10 @@ public class Auditor implements AutoCloseable { long initialDelay; try { replicasCheckLastExecutedCTime = ledgerUnderreplicationManager.getReplicasCheckCTime(); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return; } catch (UnavailableException ue) { LOG.error("Got UnavailableException while trying to get replicasCheckCTime", ue); replicasCheckLastExecutedCTime = -1; @@ -1070,6 +1082,9 @@ public class Auditor implements AutoCloseable { try { Auditor.this.ledgerUnderreplicationManager .notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (UnavailableException ae) { LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae); } @@ -1134,6 +1149,10 @@ public class Auditor implements AutoCloseable { throws BKAuditException, InterruptedException, BKException { try { waitIfLedgerReplicationDisabled(); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return; } catch (UnavailableException ue) { LOG.error("Underreplication unavailable, skipping audit." + "Will retry after a period"); @@ -1295,7 +1314,7 @@ public class Auditor implements AutoCloseable { * List all the ledgers and check them individually. This should not * be run very often. */ - void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException { + void checkAllLedgers() throws BKException, IOException, InterruptedException { final BookKeeper localClient = getBookKeeper(conf); final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient); try { @@ -1310,6 +1329,10 @@ public class Auditor implements AutoCloseable { FutureUtils.complete(processFuture, null); return; } + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return; } catch (UnavailableException ue) { LOG.error("Underreplication manager unavailable running periodic check", ue); FutureUtils.complete(processFuture, null); @@ -1367,6 +1390,9 @@ public class Auditor implements AutoCloseable { FutureUtils.result(processFuture, BKException.HANDLER); try { ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis()); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (UnavailableException ue) { LOG.error("Got exception while trying to set checkAllLedgersCTime", ue); } @@ -1490,6 +1516,9 @@ public class Auditor implements AutoCloseable { } try { ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis()); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (UnavailableException ue) { LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue); } @@ -1953,6 +1982,9 @@ public class Auditor implements AutoCloseable { } try { ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis()); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); } catch (UnavailableException ue) { LOG.error("Got exception while trying to set ReplicasCheckCTime", ue); } @@ -2054,6 +2086,10 @@ public class Auditor implements AutoCloseable { ledgerInRange); mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null); return true; + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + return true; } catch (UnavailableException une) { LOG.error("Got exception while trying to check if ledger: {} is underreplicated", ledgerInRange, une); mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java index 733f63bde8..34479fbf11 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java @@ -19,12 +19,22 @@ package org.apache.bookkeeper.replication; import java.util.function.Function; +import org.apache.zookeeper.KeeperException; /** * Exceptions for use within the replication service. */ public abstract class ReplicationException extends Exception { + public static UnavailableException fromKeeperException(String message, KeeperException ke) { + if (ke instanceof KeeperException.ConnectionLossException + || ke instanceof KeeperException.SessionExpiredException) { + return new NonRecoverableReplicationException(message, ke); + } + return new UnavailableException(message, ke); + } + + public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> { if (cause instanceof ReplicationException) { return (ReplicationException) cause; @@ -56,6 +66,21 @@ public abstract class ReplicationException extends Exception { } } + /** + * The replication service encountered an error that requires service restart. + */ + public static class NonRecoverableReplicationException extends UnavailableException { + private static final long serialVersionUID = 31872211L; + + public NonRecoverableReplicationException(String message, Throwable cause) { + super(message, cause); + } + + public NonRecoverableReplicationException(String message) { + super(message); + } + } + /** * Compatibility error. This version of the code, doesn't know how to * deal with the metadata it has found. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 721380a656..c4a0e430d6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -69,7 +69,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,7 +141,7 @@ public class ReplicationWorker implements Runnable { * - configurations */ public ReplicationWorker(final ServerConfiguration conf) - throws CompatibilityException, KeeperException, + throws CompatibilityException, UnavailableException, InterruptedException, IOException { this(conf, NullStatsLogger.INSTANCE); } @@ -159,8 +158,7 @@ public class ReplicationWorker implements Runnable { */ public ReplicationWorker(final ServerConfiguration conf, StatsLogger statsLogger) - throws CompatibilityException, KeeperException, - + throws CompatibilityException, UnavailableException, InterruptedException, IOException { this(conf, Auditor.createBookKeeperClient(conf), true, statsLogger); } @@ -169,8 +167,7 @@ public class ReplicationWorker implements Runnable { BookKeeper bkc, boolean ownBkc, StatsLogger statsLogger) - throws CompatibilityException, KeeperException, - InterruptedException, IOException { + throws CompatibilityException, InterruptedException, UnavailableException { this.conf = conf; this.bkc = bkc; this.ownBkc = ownBkc; @@ -243,6 +240,11 @@ public class ReplicationWorker implements Runnable { } catch (BKException e) { LOG.error("BKException while replicating fragments", e); waitBackOffTime(rwRereplicateBackoffMs); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("NonRecoverableReplicationException " + + "while replicating fragments", nre); + shutdown(); + return; } catch (UnavailableException e) { LOG.error("UnavailableException " + "while replicating fragments", e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java index 7979d1d1b5..09077cef70 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java @@ -39,7 +39,6 @@ import org.apache.bookkeeper.tools.framework.CliFlags; import org.apache.bookkeeper.tools.framework.CliSpec; import org.apache.bookkeeper.util.LedgerIdFormatter; import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +140,7 @@ public class ListUnderReplicatedCommand extends BookieCommand<ListUnderReplicate LedgerUnderreplicationManager underreplicationManager; try { underreplicationManager = mFactory.newLedgerUnderreplicationManager(); - } catch (KeeperException | ReplicationException.CompatibilityException e) { + } catch (ReplicationException e) { throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java index 0f86a2d2ec..b2b5e01e40 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java @@ -34,7 +34,6 @@ import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; import org.apache.bookkeeper.tools.framework.CliFlags; import org.apache.bookkeeper.tools.framework.CliSpec; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,7 +98,7 @@ public class QueryAutoRecoveryStatusCommand List<LedgerRecoverInfo> ledgerList = new LinkedList<>(); try { underreplicationManager = mFactory.newLedgerUnderreplicationManager(); - } catch (KeeperException | ReplicationException.CompatibilityException e) { + } catch (ReplicationException e) { throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java index b43e73fc62..99185a8a97 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java @@ -32,7 +32,6 @@ import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; import org.apache.bookkeeper.tools.framework.CliFlags; import org.apache.bookkeeper.tools.framework.CliSpec; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +112,7 @@ public class ToggleCommand extends BookieCommand<ToggleCommand.AutoRecoveryFlags } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new UncheckedExecutionException(e); - } catch (KeeperException | ReplicationException e) { + } catch (ReplicationException e) { throw new UncheckedExecutionException(e); } return null; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java index ad96129914..18785a58b4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Test; @@ -62,7 +61,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase { urM.setCheckAllLedgersCTime(curTime); urM.setPlacementPolicyCheckCTime(curTime); urM.setReplicasCheckCTime(curTime); - } catch (InterruptedException | KeeperException | ReplicationException e) { + } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } return null; @@ -87,7 +86,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase { if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) { Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old"); } - } catch (InterruptedException | KeeperException | ReplicationException e) { + } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } return null; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 18ace93533..635ef87ab5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -81,7 +81,6 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger; -import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -730,7 +729,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase { super(bookieIdentifier, conf, statsLogger); } - void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException { + void checkAllLedgers() throws BKException, IOException, InterruptedException { super.checkAllLedgers(); latchRef.get().countDown(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index bb302350a1..a69b9cbd49 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -34,11 +34,10 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import lombok.Cleanup; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.ClientUtil; @@ -67,6 +66,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -103,13 +103,16 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { } TestReplicationWorker(String ledgerManagerFactory) { - super(3); + super(3, 300); LOG.info("Running test case using ledger manager : " + ledgerManagerFactory); // set ledger manager name baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory); baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory); baseConf.setRereplicationEntryBatchSize(3); + baseConf.setZkTimeout(7000); + baseConf.setZkRetryBackoffMaxMs(500); + baseConf.setZkRetryBackoffStartMs(10); } @Override @@ -594,9 +597,11 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { int rw1UnableToReadEntriesForReplication = rw1.unableToReadEntriesForReplication.get(lh.getId()).size(); int rw2UnableToReadEntriesForReplication = rw2.unableToReadEntriesForReplication.get(lh.getId()).size(); assertTrue( - "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + " in RW2: " + "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + + " in RW2: " + rw2UnableToReadEntriesForReplication, - (rw1UnableToReadEntriesForReplication == 0) || (rw2UnableToReadEntriesForReplication == 0)); + (rw1UnableToReadEntriesForReplication == 0) + || (rw2UnableToReadEntriesForReplication == 0)); } finally { rw1.shutdown(); rw2.shutdown(); @@ -609,7 +614,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { public InjectedReplicationWorker(ServerConfiguration conf, StatsLogger statsLogger, CopyOnWriteArrayList<Long> delayReplicationPeriods) - throws CompatibilityException, KeeperException, InterruptedException, IOException { + throws CompatibilityException, ReplicationException.UnavailableException, + InterruptedException, IOException { super(conf, statsLogger); this.delayReplicationPeriods = delayReplicationPeriods; } @@ -829,18 +835,63 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { assertTrue("Replication worker should be running", rw.isRunning()); stopZKCluster(); - // Wait for disconnection to be picked up + // ZK is down for shorter period than reconnect timeout + Thread.sleep(1000); + startZKCluster(); + + assertTrue("Replication worker should not shutdown", rw.isRunning()); + } + } + + /** + * Test that the replication worker shuts down on non-recoverable ZK connection loss. + */ + @Test + public void testRWZKConnectionLostOnNonRecoverableZkError() throws Exception { + for (int j = 0; j < 3; j++) { + LedgerHandle lh = bkc.createLedger(1, 1, 1, + BookKeeper.DigestType.CRC32, TESTPASSWD, + null); + final long createdLedgerId = lh.getId(); for (int i = 0; i < 10; i++) { - if (!zk.getState().isConnected()) { - break; - } - Thread.sleep(1000); + lh.addEntry(data); } - assertFalse(zk.getState().isConnected()); - startZKCluster(); + lh.close(); + } + + killBookie(2); + killBookie(1); + startNewBookie(); + startNewBookie(); + + servers.get(0).getConfiguration().setRwRereplicateBackoffMs(100); + servers.get(0).startAutoRecovery(); + + Auditor auditor = getAuditor(10, TimeUnit.SECONDS); + ReplicationWorker rw = servers.get(0).getReplicationWorker(); + + ZkLedgerUnderreplicationManager ledgerUnderreplicationManager = + (ZkLedgerUnderreplicationManager) FieldUtils.readField(auditor, + "ledgerUnderreplicationManager", true); - assertTrue("Replication worker should still be running", rw.isRunning()); + ZooKeeper zkc = (ZooKeeper) FieldUtils.readField(ledgerUnderreplicationManager, "zkc", true); + auditor.submitAuditTask().get(); + + assertTrue(zkc.getState().isConnected()); + zkc.close(); + assertFalse(zkc.getState().isConnected()); + + auditor.submitAuditTask(); + rw.run(); + + for (int i = 0; i < 10; i++) { + if (!rw.isRunning() && !auditor.isRunning()) { + break; + } + Thread.sleep(1000); } + assertFalse("Replication worker should NOT be running", rw.isRunning()); + assertFalse("Auditor should NOT be running", auditor.isRunning()); } private void killAllBookies(LedgerHandle lh, BookieId excludeBK) @@ -975,7 +1026,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase { */ BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper); long ledgerId = 567L; - LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD, + LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, + BookKeeper.DigestType.CRC32, TESTPASSWD, null); for (int i = 0; i < 10; i++) { lh.addEntry(i, data); diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java index 4d83f73bf0..11eef381c1 100644 --- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java +++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.commons.configuration.ConfigurationException; import org.apache.zookeeper.KeeperException; @@ -88,7 +89,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory { @Override public LedgerUnderreplicationManager newLedgerUnderreplicationManager() - throws KeeperException, InterruptedException, CompatibilityException { + throws ReplicationException.UnavailableException, InterruptedException, CompatibilityException { throw new UnsupportedOperationException(); } diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java index b4d86828a7..b0453980bc 100644 --- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java @@ -72,7 +72,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase { @Test public void testWithoutArgs() - throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + throws InterruptedException, ReplicationException { testCommand(""); verify(factory, times(1)).newLedgerUnderreplicationManager(); verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); @@ -82,7 +82,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase { @Test public void testMissingReplica() - throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + throws InterruptedException, ReplicationException { testCommand("-mr", ""); verify(factory, times(1)).newLedgerUnderreplicationManager(); verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); @@ -92,7 +92,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase { @Test public void testExcludingMissingReplica() - throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + throws InterruptedException, ReplicationException { testCommand("-emr", ""); verify(factory, times(1)).newLedgerUnderreplicationManager(); verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); @@ -102,7 +102,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase { @Test public void testPrintMissingReplica() - throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + throws InterruptedException, ReplicationException { ArrayList<String> list = new ArrayList<>(); list.add("replica");
