This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new bbce67d  fixes #1988: Modify Coordinator to wait for the lock like the 
Manager does
bbce67d is described below

commit bbce67d7768b6352f2085c22f9f2deff775f2598
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 5 15:17:42 2021 +0000

    fixes #1988: Modify Coordinator to wait for the lock like the Manager does
---
 .../server/compaction/ExternalCompactionUtil.java  |  2 +
 .../coordinator/CompactionCoordinator.java         | 31 ++++++++++-----
 .../coordinator/CoordinatorLockWatcher.java        | 45 +++++++++++++++++++++-
 3 files changed, 66 insertions(+), 12 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 2f998c4..6ec94ac 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -93,6 +93,8 @@ public class ExternalCompactionUtil {
           List<String> compactors =
               context.getZooReaderWriter().getChildren(compactorQueuesPath + 
"/" + queue);
           for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a 
child node which
+            // represents the compactor's lock as a check that it's alive.
             List<String> children = context.getZooReaderWriter()
                 .getChildren(compactorQueuesPath + "/" + queue + "/" + 
compactor);
             if (!children.isEmpty()) {
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 895d131..fcb6461 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.coordinator;
 
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -135,11 +137,10 @@ public class CompactionCoordinator extends AbstractServer
    *
    * @param clientAddress
    *          address of this Compactor
-   * @return true if lock was acquired, else false
    * @throws KeeperException
    * @throws InterruptedException
    */
-  protected boolean getCoordinatorLock(HostAndPort clientAddress)
+  protected void getCoordinatorLock(HostAndPort clientAddress)
       throws KeeperException, InterruptedException {
     LOG.info("trying to get coordinator lock");
 
@@ -147,11 +148,23 @@ public class CompactionCoordinator extends AbstractServer
     final String lockPath = getContext().getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
     final UUID zooLockUUID = UUID.randomUUID();
 
-    CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
-    coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), 
lockPath, zooLockUUID);
-    // TODO may want to wait like manager code when lock not acquired, this 
allows starting multiple
-    // coordinators.
-    return coordinatorLock.tryLock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes());
+    while (true) {
+
+      CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
+      coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), 
lockPath, zooLockUUID);
+      coordinatorLock.lock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes());
+
+      coordinatorLockWatcher.waitForChange();
+      if (coordinatorLockWatcher.isAcquiredLock()) {
+        break;
+      }
+      if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
+        throw new IllegalStateException("manager lock in unknown state");
+      }
+      coordinatorLock.tryToCancelAsyncLockOrUnlock();
+
+      sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+    }
   }
 
   /**
@@ -193,9 +206,7 @@ public class CompactionCoordinator extends AbstractServer
     final HostAndPort clientAddress = coordinatorAddress.address;
 
     try {
-      if (!getCoordinatorLock(clientAddress)) {
-        throw new RuntimeException("Unable to get Coordinator lock.");
-      }
+      getCoordinatorLock(clientAddress);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException("Exception getting Coordinator lock", e);
     }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
index 30ebb2e..e4f3e0c 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.coordinator;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.zookeeper.KeeperException.NoAuthException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,9 @@ public class CoordinatorLockWatcher implements 
ZooLock.AccumuloLockWatcher {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorLockWatcher.class);
 
+  private volatile boolean acquiredLock = false;
+  private volatile boolean failedToAcquireLock = false;
+
   @Override
   public void lostLock(LockLossReason reason) {
     Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), 
exiting!", -1);
@@ -42,12 +46,49 @@ public class CoordinatorLockWatcher implements 
ZooLock.AccumuloLockWatcher {
 
   @Override
   public synchronized void acquiredLock() {
-    // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+    LOG.debug("Acquired Coordinator lock");
+
+    if (acquiredLock || failedToAcquireLock) {
+      Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
+    }
+
+    acquiredLock = true;
+    notifyAll();
   }
 
   @Override
   public synchronized void failedToAcquireLock(Exception e) {
-    // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+    LOG.warn("Failed to get Coordinator lock", e);
+
+    if (e instanceof NoAuthException) {
+      String msg = "Failed to acquire Coordinator lock due to incorrect 
ZooKeeper authentication.";
+      LOG.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
+      Halt.halt(msg, -1);
+    }
+
+    if (acquiredLock) {
+      Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + 
failedToAcquireLock, -1);
+    }
+
+    failedToAcquireLock = true;
+    notifyAll();
+  }
+
+  public synchronized void waitForChange() {
+    while (!acquiredLock && !failedToAcquireLock) {
+      try {
+        LOG.info("Coordinator lock held by someone else, waiting for a change 
in state");
+        wait();
+      } catch (InterruptedException e) {}
+    }
+  }
+
+  public boolean isAcquiredLock() {
+    return acquiredLock;
+  }
+
+  public boolean isFailedToAcquireLock() {
+    return failedToAcquireLock;
   }
 
 }

Reply via email to