This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new bbce67d fixes #1988: Modify Coordinator to wait for the lock like the Manager does bbce67d is described below commit bbce67d7768b6352f2085c22f9f2deff775f2598 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 5 15:17:42 2021 +0000 fixes #1988: Modify Coordinator to wait for the lock like the Manager does --- .../server/compaction/ExternalCompactionUtil.java | 2 + .../coordinator/CompactionCoordinator.java | 31 ++++++++++----- .../coordinator/CoordinatorLockWatcher.java | 45 +++++++++++++++++++++- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index 2f998c4..6ec94ac 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -93,6 +93,8 @@ public class ExternalCompactionUtil { List<String> compactors = context.getZooReaderWriter().getChildren(compactorQueuesPath + "/" + queue); for (String compactor : compactors) { + // compactor is the address, we are checking to see if there is a child node which + // represents the compactor's lock as a check that it's alive. List<String> children = context.getZooReaderWriter() .getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); if (!children.isEmpty()) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 895d131..fcb6461 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.coordinator; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -135,11 +137,10 @@ public class CompactionCoordinator extends AbstractServer * * @param clientAddress * address of this Compactor - * @return true if lock was acquired, else false * @throws KeeperException * @throws InterruptedException */ - protected boolean getCoordinatorLock(HostAndPort clientAddress) + protected void getCoordinatorLock(HostAndPort clientAddress) throws KeeperException, InterruptedException { LOG.info("trying to get coordinator lock"); @@ -147,11 +148,23 @@ public class CompactionCoordinator extends AbstractServer final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; final UUID zooLockUUID = UUID.randomUUID(); - CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); - coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID); - // TODO may want to wait like manager code when lock not acquired, this allows starting multiple - // coordinators. - return coordinatorLock.tryLock(coordinatorLockWatcher, coordinatorClientAddress.getBytes()); + while (true) { + + CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID); + coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes()); + + coordinatorLockWatcher.waitForChange(); + if (coordinatorLockWatcher.isAcquiredLock()) { + break; + } + if (!coordinatorLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("manager lock in unknown state"); + } + coordinatorLock.tryToCancelAsyncLockOrUnlock(); + + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + } } /** @@ -193,9 +206,7 @@ public class CompactionCoordinator extends AbstractServer final HostAndPort clientAddress = coordinatorAddress.address; try { - if (!getCoordinatorLock(clientAddress)) { - throw new RuntimeException("Unable to get Coordinator lock."); - } + getCoordinatorLock(clientAddress); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception getting Coordinator lock", e); } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java index 30ebb2e..e4f3e0c 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java @@ -21,6 +21,7 @@ package org.apache.accumulo.coordinator; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.zookeeper.KeeperException.NoAuthException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,9 @@ public class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher { private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class); + private volatile boolean acquiredLock = false; + private volatile boolean failedToAcquireLock = false; + @Override public void lostLock(LockLossReason reason) { Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); @@ -42,12 +46,49 @@ public class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher { @Override public synchronized void acquiredLock() { - // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + LOG.debug("Acquired Coordinator lock"); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); } @Override public synchronized void failedToAcquireLock(Exception e) { - // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + LOG.warn("Failed to get Coordinator lock", e); + + if (e instanceof NoAuthException) { + String msg = "Failed to acquire Coordinator lock due to incorrect ZooKeeper authentication."; + LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); + Halt.halt(msg, -1); + } + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + LOG.info("Coordinator lock held by someone else, waiting for a change in state"); + wait(); + } catch (InterruptedException e) {} + } + } + + public boolean isAcquiredLock() { + return acquiredLock; + } + + public boolean isFailedToAcquireLock() { + return failedToAcquireLock; } }