This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 268bcce  HBASE-22208 Create access checker and expose it in RS
268bcce is described below

commit 268bcce76f861ced708ebb683e236c1e73b3c1ad
Author: meiyi <[email protected]>
AuthorDate: Thu Apr 18 17:47:45 2019 +0800

    HBASE-22208 Create access checker and expose it in RS
    
    Signed-off-by: Guanghao Zhang <[email protected]>
---
 .../hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java |   5 +-
 .../hadoop/hbase/rsgroup/TestRSGroupsWithACL.java  |   4 -
 .../hadoop/hbase/master/MasterRpcServices.java     |   6 +-
 .../apache/hadoop/hbase/master/MasterServices.java |  12 +++
 .../hadoop/hbase/regionserver/HRegionServer.java   |  12 +++
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  36 ++++++--
 .../hbase/regionserver/RegionServerServices.java   |  12 +++
 .../hbase/security/access/AccessChecker.java       |  68 +-------------
 .../hbase/security/access/AccessController.java    |  42 +++++----
 .../hadoop/hbase/security/access/AuthManager.java  |  84 +----------------
 .../hbase/security/access/NoopAccessChecker.java   | 100 +++++++++++++++++++++
 .../hadoop/hbase/MockRegionServerServices.java     |  12 +++
 .../hbase/master/MockNoopMasterServices.java       |  12 +++
 .../hadoop/hbase/master/MockRegionServer.java      |  12 +++
 .../security/access/TestAccessController.java      |   2 -
 .../security/access/TestAccessController3.java     |   4 +-
 .../security/access/TestTablePermissions.java      |   2 +-
 .../security/access/TestZKPermissionWatcher.java   |  20 +++--
 18 files changed, 252 insertions(+), 193 deletions(-)

diff --git 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
index 3d1f780..090ac6e 100644
--- 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
+++ 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,8 +119,7 @@ public class RSGroupAdminEndpoint implements 
MasterCoprocessor, MasterObserver {
     if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
       throw new IOException("Configured balancer does not support RegionServer 
groups.");
     }
-    ZKWatcher zk = ((HasMasterServices)env).getMasterServices().getZooKeeper();
-    accessChecker = new AccessChecker(env.getConfiguration(), zk);
+    accessChecker = ((HasMasterServices) 
env).getMasterServices().getAccessChecker();
 
     // set the user-provider.
     this.userProvider = UserProvider.instantiate(env.getConfiguration());
@@ -129,7 +127,6 @@ public class RSGroupAdminEndpoint implements 
MasterCoprocessor, MasterObserver {
 
   @Override
   public void stop(CoprocessorEnvironment env) {
-    accessChecker.stop();
   }
 
   @Override
diff --git 
a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
 
b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
index 4b23f18..0278e3c 100644
--- 
a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
+++ 
b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.rsgroup;
 
 import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessControlClient;
-import org.apache.hadoop.hbase.security.access.AuthManager;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.access.PermissionStorage;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@@ -203,8 +201,6 @@ public class TestRSGroupsWithACL extends SecureTestUtil{
   public static void tearDownAfterClass() throws Exception {
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
-    int total = AuthManager.getTotalRefCount();
-    assertTrue("Unexpected reference count: " + total, total == 0);
   }
 
   private static void configureRSGroupAdminEndpoint(Configuration conf) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 222a686..cd017d8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -916,7 +916,7 @@ public class MasterRpcServices extends RSRpcServices
           + desc.getSignature()));
       }
       LOG.info(master.getClientIdAuditPrefix() + " procedure request for: " + 
desc.getSignature());
-      mpm.checkPermissions(desc, accessChecker, 
RpcServer.getRequestUser().orElse(null));
+      mpm.checkPermissions(desc, getAccessChecker(), 
RpcServer.getRequestUser().orElse(null));
       mpm.execProcedure(desc);
       // send back the max amount of time the client should wait for the 
procedure
       // to complete
@@ -2816,10 +2816,10 @@ public class MasterRpcServices extends RSRpcServices
         caller = new InputUser(userName, groups.toArray(new 
String[groups.size()]));
       }
       List<Boolean> hasUserPermissions = new ArrayList<>();
-      if (accessChecker != null) {
+      if (getAccessChecker() != null) {
         for (Permission permission : permissions) {
           boolean hasUserPermission =
-              accessChecker.hasUserPermission(caller, "hasUserPermissions", 
permission);
+              getAccessChecker().hasUserPermission(caller, 
"hasUserPermissions", permission);
           hasUserPermissions.add(hasUserPermission);
         }
       } else {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 12c78ac..d5aa4fe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -51,6 +51,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -517,4 +519,14 @@ public interface MasterServices extends Server {
   default SplitWALManager getSplitWALManager(){
     return null;
   }
+
+  /**
+   * @return the {@link AccessChecker}
+   */
+  AccessChecker getAccessChecker();
+
+  /**
+   * @return the {@link ZKPermissionWatcher}
+   */
+  ZKPermissionWatcher getZKPermissionWatcher();
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0b853b9..86f33c9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -145,6 +145,8 @@ import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Addressing;
@@ -3660,6 +3662,16 @@ public class HRegionServer extends HasThread implements
     return Optional.ofNullable(this.mobFileCache);
   }
 
+  @Override
+  public AccessChecker getAccessChecker() {
+    return rpcServices.getAccessChecker();
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return rpcServices.getZkPermissionWatcher();
+  }
+
   /**
    * @return : Returns the ConfigurationManager object for testing purposes.
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 801ff91..688c03d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -131,7 +131,9 @@ import 
org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClient
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
 import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -144,6 +146,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -344,12 +347,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
 
-  // We want to vet all accesses at the point of entry itself; limiting scope 
of access checker
-  // instance to only this class to prevent its use from spreading deeper into 
implementation.
-  // Initialized in start() since AccessChecker needs ZKWatcher which is 
created by HRegionServer
-  // after RSRpcServices constructor and before start() is called.
-  // Initialized only if authorization is enabled, else remains null.
-  protected AccessChecker accessChecker;
+  private AccessChecker accessChecker;
+  private ZKPermissionWatcher zkPermissionWatcher;
 
   /**
    * Services launched in RSRpcServices. By default they are on but you can 
use the below
@@ -1482,15 +1481,26 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
 
   void start(ZKWatcher zkWatcher) {
     if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
-      accessChecker = new AccessChecker(getConfiguration(), zkWatcher);
+      accessChecker = new AccessChecker(getConfiguration());
+    } else {
+      accessChecker = new NoopAccessChecker(getConfiguration());
+    }
+    if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && 
zkWatcher != null) {
+      zkPermissionWatcher =
+          new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), 
getConfiguration());
+      try {
+        zkPermissionWatcher.start();
+      } catch (KeeperException e) {
+        LOG.error("ZooKeeper permission watcher initialization failed", e);
+      }
     }
     this.scannerIdGenerator = new 
ScannerIdGenerator(this.regionServer.serverName);
     rpcServer.start();
   }
 
   void stop() {
-    if (accessChecker != null) {
-      accessChecker.stop();
+    if (zkPermissionWatcher != null) {
+      zkPermissionWatcher.close();
     }
     closeAllScanners();
     rpcServer.stop();
@@ -3777,4 +3787,12 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   public RpcScheduler getRpcScheduler() {
     return rpcServer.getScheduler();
   }
+
+  protected AccessChecker getAccessChecker() {
+    return accessChecker;
+  }
+
+  protected ZKPermissionWatcher getZkPermissionWatcher() {
+    return zkPermissionWatcher;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 17f318b..04a1bc6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -40,6 +40,8 @@ import 
org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -304,4 +306,14 @@ public interface RegionServerServices extends Server, 
MutableOnlineRegions, Favo
    * @return The cache for mob files.
    */
   Optional<MobFileCache> getMobFileCache();
+
+  /**
+   * @return the {@link AccessChecker}
+   */
+  AccessChecker getAccessChecker();
+
+  /**
+   * @return {@link ZKPermissionWatcher}
+   */
+  ZKPermissionWatcher getZKPermissionWatcher();
 }
\ No newline at end of file
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
index 4632874..c04a3f0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -53,24 +52,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
 
 @InterfaceAudience.Private
-public final class AccessChecker {
+public class AccessChecker {
   private static final Logger LOG = 
LoggerFactory.getLogger(AccessChecker.class);
   private static final Logger AUDITLOG =
       LoggerFactory.getLogger("SecurityLogger." + 
AccessChecker.class.getName());
-  // TODO: we should move to a design where we don't even instantiate an 
AccessChecker if
-  // authorization is not enabled (like in RSRpcServices), instead of always 
instantiating one and
-  // calling requireXXX() only to do nothing (since authorizationEnabled will 
be false).
-  private AuthManager authManager;
+  private final AuthManager authManager;
 
   /** Group service to retrieve the user group information */
   private static Groups groupService;
 
-  /**
-   * if we are active, usually false, only true if 
"hbase.security.authorization"
-   * has been set to true in site configuration.see HBASE-19483.
-   */
-  private boolean authorizationEnabled;
-
   public static boolean isAuthorizationSupported(Configuration conf) {
     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, false);
   }
@@ -79,30 +69,12 @@ public final class AccessChecker {
    * Constructor with existing configuration
    *
    * @param conf Existing configuration to use
-   * @param zkw reference to the {@link ZKWatcher}
    */
-  public AccessChecker(final Configuration conf, final ZKWatcher zkw)
-      throws RuntimeException {
-    if (zkw != null) {
-      try {
-        this.authManager = AuthManager.getOrCreate(zkw, conf);
-      } catch (IOException ioe) {
-        throw new RuntimeException("Error obtaining AccessChecker", ioe);
-      }
-    } else {
-      throw new NullPointerException("Error obtaining AccessChecker, zk found 
null.");
-    }
-    authorizationEnabled = isAuthorizationSupported(conf);
+  public AccessChecker(final Configuration conf) {
+    this.authManager = new AuthManager(conf);
     initGroupService(conf);
   }
 
-  /**
-   * Releases {@link AuthManager}'s reference.
-   */
-  public void stop() {
-    AuthManager.release(authManager);
-  }
-
   public AuthManager getAuthManager() {
     return authManager;
   }
@@ -119,9 +91,6 @@ public final class AccessChecker {
    */
   public void requireAccess(User user, String request, TableName tableName,
       Action... permissions) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result = null;
 
     for (Action permission : permissions) {
@@ -170,9 +139,6 @@ public final class AccessChecker {
   public void requireGlobalPermission(User user, String request,
       Action perm, TableName tableName,
       Map<byte[], ? extends Collection<byte[]>> familyMap, String filterUser) 
throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result;
     if (authManager.authorizeUserGlobal(user, perm)) {
       result = AuthResult.allow(request, "Global check allowed", user, perm, 
tableName, familyMap);
@@ -201,9 +167,6 @@ public final class AccessChecker {
    */
   public void requireGlobalPermission(User user, String request, Action perm,
       String namespace) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult authResult;
     if (authManager.authorizeUserGlobal(user, perm)) {
       authResult = AuthResult.allow(request, "Global check allowed", user, 
perm, null);
@@ -229,9 +192,6 @@ public final class AccessChecker {
    */
   public void requireNamespacePermission(User user, String request, String 
namespace,
       String filterUser, Action... permissions) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result = null;
 
     for (Action permission : permissions) {
@@ -264,9 +224,6 @@ public final class AccessChecker {
   public void requireNamespacePermission(User user, String request, String 
namespace,
       TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
       Action... permissions) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result = null;
 
     for (Action permission : permissions) {
@@ -303,9 +260,6 @@ public final class AccessChecker {
    */
   public void requirePermission(User user, String request, TableName 
tableName, byte[] family,
       byte[] qualifier, String filterUser, Action... permissions) throws 
IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result = null;
 
     for (Action permission : permissions) {
@@ -341,9 +295,6 @@ public final class AccessChecker {
   public void requireTablePermission(User user, String request,
       TableName tableName,byte[] family, byte[] qualifier,
       Action... permissions) throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
     AuthResult result = null;
 
     for (Action permission : permissions) {
@@ -374,10 +325,6 @@ public final class AccessChecker {
    */
   public void performOnSuperuser(String request, User caller, String 
userToBeChecked)
       throws IOException {
-    if (!authorizationEnabled) {
-      return;
-    }
-
     List<String> userGroups = new ArrayList<>();
     userGroups.add(userToBeChecked);
     if (!AuthUtil.isGroupPrincipal(userToBeChecked)) {
@@ -541,9 +488,6 @@ public final class AccessChecker {
    * @return True if the user has the specific permission
    */
   public boolean hasUserPermission(User user, String request, Permission 
permission) {
-    if (!authorizationEnabled) {
-      return true;
-    }
     if (permission instanceof TablePermission) {
       TablePermission tPerm = (TablePermission) permission;
       for (Permission.Action action : permission.getActions()) {
@@ -609,10 +553,6 @@ public final class AccessChecker {
    */
   public AuthResult permissionGranted(String request, User user, Action 
permRequest,
       TableName tableName, Map<byte[], ? extends Collection<?>> families) {
-    if (!authorizationEnabled) {
-      return AuthResult.allow(request, "All users allowed because 
authorization is disabled", user,
-        permRequest, tableName, families);
-    }
     // 1. All users need read access to hbase:meta table.
     // this is a very common operation, so deal with it quickly.
     if (TableName.META_TABLE_NAME.equals(tableName)) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 0949207..9dcfc8c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
@@ -109,6 +110,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -130,7 +132,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -191,6 +192,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
   private static final byte[] TRUE = Bytes.toBytes(true);
 
   private AccessChecker accessChecker;
+  private ZKPermissionWatcher zkPermissionWatcher;
 
   /** flags if we are running on a region of the _acl_ table */
   private boolean aclRegion = false;
@@ -252,7 +254,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
       byte[] entry = t.getKey();
       ListMultimap<String, UserPermission> perms = t.getValue();
       byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, 
conf);
-      getAuthManager().getZKPermissionWatcher().writeToZookeeper(entry, 
serialized);
+      zkPermissionWatcher.writeToZookeeper(entry, serialized);
     }
     initialized = true;
   }
@@ -273,7 +275,6 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
         }
       }
     }
-    ZKPermissionWatcher zkw = getAuthManager().getZKPermissionWatcher();
     Configuration conf = regionEnv.getConfiguration();
     byte [] currentEntry = null;
     // TODO: Here we are already on the ACL region. (And it is single
@@ -289,7 +290,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
         ListMultimap<String, UserPermission> perms =
             PermissionStorage.getPermissions(conf, entry, t, null, null, null, 
false);
         byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, 
conf);
-        zkw.writeToZookeeper(entry, serialized);
+        zkPermissionWatcher.writeToZookeeper(entry, serialized);
       }
     } catch(IOException ex) {
           LOG.error("Failed updating permissions mirror for '" +
@@ -685,39 +686,48 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
           + " accordingly.");
     }
 
-    ZKWatcher zk = null;
     if (env instanceof MasterCoprocessorEnvironment) {
       // if running on HMaster
-      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment)env;
+      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
       if (mEnv instanceof HasMasterServices) {
-        zk = ((HasMasterServices)mEnv).getMasterServices().getZooKeeper();
+        MasterServices masterServices = ((HasMasterServices) 
mEnv).getMasterServices();
+        zkPermissionWatcher = masterServices.getZKPermissionWatcher();
+        accessChecker = masterServices.getAccessChecker();
       }
     } else if (env instanceof RegionServerCoprocessorEnvironment) {
-      RegionServerCoprocessorEnvironment rsEnv = 
(RegionServerCoprocessorEnvironment)env;
+      RegionServerCoprocessorEnvironment rsEnv = 
(RegionServerCoprocessorEnvironment) env;
       if (rsEnv instanceof HasRegionServerServices) {
-        zk = 
((HasRegionServerServices)rsEnv).getRegionServerServices().getZooKeeper();
+        RegionServerServices rsServices =
+            ((HasRegionServerServices) rsEnv).getRegionServerServices();
+        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
+        accessChecker = rsServices.getAccessChecker();
       }
     } else if (env instanceof RegionCoprocessorEnvironment) {
       // if running at region
       regionEnv = (RegionCoprocessorEnvironment) env;
       conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
       compatibleEarlyTermination = 
conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
-          AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
+        AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
       if (regionEnv instanceof HasRegionServerServices) {
-        zk = 
((HasRegionServerServices)regionEnv).getRegionServerServices().getZooKeeper();
+        RegionServerServices rsServices =
+            ((HasRegionServerServices) regionEnv).getRegionServerServices();
+        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
+        accessChecker = rsServices.getAccessChecker();
       }
     }
 
+    if (zkPermissionWatcher == null) {
+      throw new NullPointerException("ZKPermissionWatcher is null");
+    } else if (accessChecker == null) {
+      throw new NullPointerException("AccessChecker is null");
+    }
     // set the user-provider.
     this.userProvider = UserProvider.instantiate(env.getConfiguration());
-    // Throws RuntimeException if fails to load AuthManager so that 
coprocessor is unloaded.
-    accessChecker = new AccessChecker(env.getConfiguration(), zk);
     tableAcls = new MapMaker().weakValues().makeMap();
   }
 
   @Override
   public void stop(CoprocessorEnvironment env) {
-    accessChecker.stop();
   }
 
   /*********************************** Observer/Service Getters 
***********************************/
@@ -837,7 +847,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
         return null;
       }
     });
-    getAuthManager().getZKPermissionWatcher().deleteTableACLNode(tableName);
+    zkPermissionWatcher.deleteTableACLNode(tableName);
   }
 
   @Override
@@ -1147,7 +1157,7 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
         return null;
       }
     });
-    
getAuthManager().getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
+    zkPermissionWatcher.deleteNamespaceACLNode(namespace);
     LOG.info(namespace + " entry deleted in " + 
PermissionStorage.ACL_TABLE_NAME + " table.");
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
index e374a1b..5262036 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hbase.security.access;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,17 +32,13 @@ import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
 
 /**
@@ -63,7 +58,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
  * </p>
  */
 @InterfaceAudience.Private
-public final class AuthManager implements Closeable {
+public final class AuthManager {
 
   /**
    * Cache of permissions, it is thread safe.
@@ -114,28 +109,10 @@ public final class AuthManager implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(AuthManager.class);
 
   private Configuration conf;
-  private ZKPermissionWatcher zkperms;
   private final AtomicLong mtime = new AtomicLong(0L);
 
-  private AuthManager(ZKWatcher watcher, Configuration conf)
-      throws IOException {
+  AuthManager(Configuration conf) {
     this.conf = conf;
-
-    this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
-    try {
-      this.zkperms.start();
-    } catch (KeeperException ke) {
-      LOG.error("ZooKeeper initialization failed", ke);
-    }
-  }
-
-  @Override
-  public void close() {
-    this.zkperms.close();
-  }
-
-  public ZKPermissionWatcher getZKPermissionWatcher() {
-    return this.zkperms;
   }
 
   /**
@@ -515,61 +492,4 @@ public final class AuthManager implements Closeable {
   public long getMTime() {
     return mtime.get();
   }
-
-  private static Map<ZKWatcher, AuthManager> managerMap = new HashMap<>();
-
-  private static Map<AuthManager, Integer> refCount = new HashMap<>();
-
-  /**
-   * Returns a AuthManager from the cache. If not cached, constructs a new one.
-   * Returned instance should be released back by calling {@link 
#release(AuthManager)}.
-   * @param watcher zk watcher
-   * @param conf configuration
-   * @return an AuthManager
-   * @throws IOException zookeeper initialization failed
-   */
-  public synchronized static AuthManager getOrCreate(
-      ZKWatcher watcher, Configuration conf) throws IOException {
-    AuthManager instance = managerMap.get(watcher);
-    if (instance == null) {
-      instance = new AuthManager(watcher, conf);
-      managerMap.put(watcher, instance);
-    }
-    int ref = refCount.get(instance) == null ? 0 : refCount.get(instance);
-    refCount.put(instance, ref + 1);
-    return instance;
-  }
-
-  @VisibleForTesting
-  public static int getTotalRefCount() {
-    int total = 0;
-    for (int count : refCount.values()) {
-      total += count;
-    }
-    return total;
-  }
-
-  /**
-   * Releases the resources for the given AuthManager if the reference count 
is down to 0.
-   * @param instance AuthManager to be released
-   */
-  public synchronized static void release(AuthManager instance) {
-    if (refCount.get(instance) == null || refCount.get(instance) < 1) {
-      String msg = "Something wrong with the AuthManager reference counting: " 
+ instance
-          + " whose count is " + refCount.get(instance);
-      LOG.error(HBaseMarkers.FATAL, msg);
-      instance.close();
-      managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
-      instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
-    } else {
-      int ref = refCount.get(instance);
-      --ref;
-      refCount.put(instance, ref);
-      if (ref == 0) {
-        instance.close();
-        managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
-        refCount.remove(instance);
-      }
-    }
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/NoopAccessChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/NoopAccessChecker.java
new file mode 100644
index 0000000..95927c0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/NoopAccessChecker.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * NoopAccessChecker is returned when hbase.security.authorization is not 
enabled.
+ * Always allow authorization if any user require any permission.
+ */
[email protected]
+public final class NoopAccessChecker extends AccessChecker {
+
+  public NoopAccessChecker(Configuration conf) throws RuntimeException {
+    super(conf);
+  }
+
+  @Override
+  public void requireAccess(User user, String request, TableName tableName, 
Action... permissions) {
+  }
+
+  @Override
+  public void requirePermission(User user, String request, String filterUser, 
Action perm) {
+    requireGlobalPermission(user, request, perm, null, null, filterUser);
+  }
+
+  @Override
+  public void requireGlobalPermission(User user, String request, Action perm, 
TableName tableName,
+      Map<byte[], ? extends Collection<byte[]>> familyMap, String filterUser) {
+  }
+
+  @Override
+  public void requireGlobalPermission(User user, String request, Action perm, 
String namespace) {
+  }
+
+  @Override
+  public void requireNamespacePermission(User user, String request, String 
namespace,
+      String filterUser, Action... permissions) {
+  }
+
+  @Override
+  public void requireNamespacePermission(User user, String request, String 
namespace,
+      TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
+      Action... permissions) {
+  }
+
+  @Override
+  public void requirePermission(User user, String request, TableName 
tableName, byte[] family,
+      byte[] qualifier, String filterUser, Action... permissions) {
+  }
+
+  @Override
+  public void requireTablePermission(User user, String request, TableName 
tableName, byte[] family,
+      byte[] qualifier, Action... permissions) {
+  }
+
+  @Override
+  public void performOnSuperuser(String request, User caller, String 
userToBeChecked) {
+  }
+
+  @Override
+  public void checkLockPermissions(User user, String namespace, TableName 
tableName,
+      RegionInfo[] regionInfos, String reason) {
+  }
+
+  @Override
+  public boolean hasUserPermission(User user, String request, Permission 
permission) {
+    return true;
+  }
+
+  @Override
+  public AuthResult permissionGranted(String request, User user, Action 
permRequest,
+      TableName tableName, Map<byte[], ? extends Collection<?>> families) {
+    return AuthResult.allow(request, "All users allowed because authorization 
is disabled", user,
+      permRequest, tableName, families);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0e4f241..f5e2793 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -56,6 +56,8 @@ import 
org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -368,4 +370,14 @@ public class MockRegionServerServices implements 
RegionServerServices {
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  @Override
+  public AccessChecker getAccessChecker() {
+    return null;
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return null;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 9c55f57..35d53c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -54,6 +54,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 
 public class MockNoopMasterServices implements MasterServices {
@@ -473,4 +475,14 @@ public class MockNoopMasterServices implements 
MasterServices {
   public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
     return null;
   }
+
+  @Override
+  public AccessChecker getAccessChecker() {
+    return null;
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return null;
+  }
 }
\ No newline at end of file
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a930d7f..2afb456 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -69,6 +69,8 @@ import 
org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -721,4 +723,14 @@ class MockRegionServer implements 
AdminProtos.AdminService.BlockingInterface,
   public Optional<MobFileCache> getMobFileCache() {
     return Optional.empty();
   }
+
+  @Override
+  public AccessChecker getAccessChecker() {
+    return null;
+  }
+
+  @Override
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return null;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index a87a838..523b82f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -274,8 +274,6 @@ public class TestAccessController extends SecureTestUtil {
   public static void tearDownAfterClass() throws Exception {
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
-    int total = AuthManager.getTotalRefCount();
-    assertTrue("Unexpected reference count: " + total, total == 0);
   }
 
   private static void setUpTableAndUserPermissions() throws Exception {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
index 864928c..d4ae32f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.security.access;
 
 import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -202,7 +202,7 @@ public class TestAccessController3 extends SecureTestUtil {
     }
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
-    assertTrue("region server should have aborted due to 
FaultyAccessController", rs.isAborted());
+    assertFalse("region server should have aborted due to 
FaultyAccessController", rs.isAborted());
   }
 
   private static void setUpTableAndUserPermissions() throws Exception {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 962db9f..14ea0b3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -451,7 +451,7 @@ public class TestTablePermissions {
      * test a race condition causing AuthManager to sometimes fail global 
permissions checks
      * when the global cache is being updated
      */
-    AuthManager authManager = AuthManager.getOrCreate(ZKW, conf);
+    AuthManager authManager = new AuthManager(conf);
     // currently running user is the system user and should have global admin 
perms
     User currentUser = User.getCurrent();
     assertTrue(authManager.authorizeUserGlobal(currentUser, 
Permission.Action.ADMIN));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
index 3c182d8..67c2612 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
@@ -58,6 +58,8 @@ public class TestZKPermissionWatcher {
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
   private static AuthManager AUTH_A;
   private static AuthManager AUTH_B;
+  private static ZKPermissionWatcher WATCHER_A;
+  private static ZKPermissionWatcher WATCHER_B;
   private final static Abortable ABORTABLE = new Abortable() {
     private final AtomicBoolean abort = new AtomicBoolean(false);
 
@@ -84,14 +86,20 @@ public class TestZKPermissionWatcher {
 
     // start minicluster
     UTIL.startMiniCluster();
-    AUTH_A = AuthManager.getOrCreate(new ZKWatcher(conf,
-      "TestZKPermissionsWatcher_1", ABORTABLE), conf);
-    AUTH_B = AuthManager.getOrCreate(new ZKWatcher(conf,
-      "TestZKPermissionsWatcher_2", ABORTABLE), conf);
+    AUTH_A = new AuthManager(conf);
+    AUTH_B = new AuthManager(conf);
+    WATCHER_A = new ZKPermissionWatcher(
+        new ZKWatcher(conf, "TestZKPermissionsWatcher_1", ABORTABLE), AUTH_A, 
conf);
+    WATCHER_B = new ZKPermissionWatcher(
+        new ZKWatcher(conf, "TestZKPermissionsWatcher_2", ABORTABLE), AUTH_B, 
conf);
+    WATCHER_A.start();
+    WATCHER_B.start();
   }
 
   @AfterClass
   public static void afterClass() throws Exception {
+    WATCHER_A.close();
+    WATCHER_B.close();
     UTIL.shutdownMiniCluster();
   }
 
@@ -118,7 +126,7 @@ public class TestZKPermissionWatcher {
     ListMultimap<String, UserPermission> multimap = ArrayListMultimap.create();
     multimap.putAll(george.getShortName(), acl);
     byte[] serialized = PermissionStorage.writePermissionsAsBytes(multimap, 
conf);
-    AUTH_A.getZKPermissionWatcher().writeToZookeeper(TEST_TABLE.getName(), 
serialized);
+    WATCHER_A.writeToZookeeper(TEST_TABLE.getName(), serialized);
     final long mtimeB = AUTH_B.getMTime();
     // Wait for the update to propagate
     UTIL.waitFor(10000, 100, new Predicate<Exception>() {
@@ -146,7 +154,7 @@ public class TestZKPermissionWatcher {
     final long mtimeA = AUTH_A.getMTime();
     multimap.putAll(hubert.getShortName(), acl2);
     byte[] serialized2 = PermissionStorage.writePermissionsAsBytes(multimap, 
conf);
-    AUTH_B.getZKPermissionWatcher().writeToZookeeper(TEST_TABLE.getName(), 
serialized2);
+    WATCHER_B.writeToZookeeper(TEST_TABLE.getName(), serialized2);
     // Wait for the update to propagate
     UTIL.waitFor(10000, 100, new Predicate<Exception>() {
       @Override

Reply via email to