Repository: hbase Updated Branches: refs/heads/branch-2 1653b54de -> d15947895
HBASE-18628 Fix event pre-emption in ZKPermWatcher Instead of using an Atomic Reference to data and aborting when we detect that new data comes in, use the native cancellation/pre-emption features of Java Future. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d1594789 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d1594789 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d1594789 Branch: refs/heads/branch-2 Commit: d15947895668baf2acbb3a6aef9984f73921c64a Parents: 1653b54 Author: Mike Drob <[email protected]> Authored: Fri Aug 18 11:11:46 2017 -0500 Committer: Mike Drob <[email protected]> Committed: Tue Aug 22 11:16:22 2017 -0500 ---------------------------------------------------------------------- .../security/access/ZKPermissionWatcher.java | 55 +++--- .../access/TestZKPermissionWatcher.java | 178 +++++++++++++++++++ .../access/TestZKPermissionsWatcher.java | 178 ------------------- 3 files changed, 204 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d1594789/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index 3324b90..fc94e06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -55,11 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; - TableAuthManager authManager; - String aclZNode; - CountDownLatch initialized = new CountDownLatch(1); - AtomicReference<List<ZKUtil.NodeAndData>> nodes = new AtomicReference<>(null); - ExecutorService executor; + private final TableAuthManager authManager; + private final String aclZNode; + private final CountDownLatch initialized = new CountDownLatch(1); + private final ExecutorService executor; + private Future<?> childrenChangedFuture; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -82,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable List<ZKUtil.NodeAndData> existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { - refreshNodes(existing, null); + refreshNodes(existing); } return null; } @@ -126,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable try { List<ZKUtil.NodeAndData> nodes = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes, null); + refreshNodes(nodes); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper", ke); // only option is to abort @@ -179,42 +180,37 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } + @Override public void nodeChildrenChanged(final String path) { waitUntilStarted(); if (path.equals(aclZNode)) { try { - List<ZKUtil.NodeAndData> nodeList = + final List<ZKUtil.NodeAndData> nodeList = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - while (!nodes.compareAndSet(null, nodeList)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - LOG.warn("Interrupted while setting node list", e); - Thread.currentThread().interrupt(); + // preempt any existing nodeChildrenChanged event processing + if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { + boolean cancelled = childrenChangedFuture.cancel(true); + if (!cancelled) { + // task may have finished between our check and attempted cancel, this is fine. + if (! childrenChangedFuture.isDone()) { + LOG.warn("Could not cancel processing node children changed event, " + + "please file a JIRA and attach logs if possible."); + } } } + childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList)); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper for path "+path, ke); watcher.abort("ZooKeeper error get node children for path "+path, ke); } - asyncProcessNodeUpdate(new Runnable() { - // allows subsequent nodeChildrenChanged event to preempt current processing of - // nodeChildrenChanged event - @Override - public void run() { - List<ZKUtil.NodeAndData> nodeList = nodes.get(); - nodes.set(null); - refreshNodes(nodeList, nodes); - } - }); } } - private void asyncProcessNodeUpdate(Runnable runnable) { + private Future<?> asyncProcessNodeUpdate(Runnable runnable) { if (!executor.isShutdown()) { try { - executor.submit(runnable); + return executor.submit(runnable); } catch (RejectedExecutionException e) { if (executor.isShutdown()) { LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); @@ -223,12 +219,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } } + return null; // No task launched so there will be nothing to cancel later } - private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) { + private void refreshNodes(List<ZKUtil.NodeAndData> nodes) { for (ZKUtil.NodeAndData n : nodes) { - if (ref != null && ref.get() != null) { - // there is a newer list + if (Thread.interrupted()) { + // Use Thread.interrupted so that we clear interrupt status break; } if (n.isEmpty()) continue; http://git-wip-us.apache.org/repos/asf/hbase/blob/d1594789/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java new file mode 100644 index 0000000..76de0c6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.access; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the reading and writing of access permissions to and from zookeeper. + */ +@Category({SecurityTests.class, LargeTests.class}) +public class TestZKPermissionWatcher { + private static final Log LOG = LogFactory.getLog(TestZKPermissionWatcher.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static TableAuthManager AUTH_A; + private static TableAuthManager AUTH_B; + private final static Abortable ABORTABLE = new Abortable() { + private final AtomicBoolean abort = new AtomicBoolean(false); + + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + abort.set(true); + } + + @Override + public boolean isAborted() { + return abort.get(); + } + }; + + private static TableName TEST_TABLE = + TableName.valueOf("perms_test"); + + @BeforeClass + public static void beforeClass() throws Exception { + // setup configuration + Configuration conf = UTIL.getConfiguration(); + SecureTestUtil.enableSecurity(conf); + + // start minicluster + UTIL.startMiniCluster(); + AUTH_A = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, + "TestZKPermissionsWatcher_1", ABORTABLE), conf); + AUTH_B = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, + "TestZKPermissionsWatcher_2", ABORTABLE), conf); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testPermissionsWatcher() throws Exception { + Configuration conf = UTIL.getConfiguration(); + User george = User.createUserForTesting(conf, "george", new String[] { }); + User hubert = User.createUserForTesting(conf, "hubert", new String[] { }); + + assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + + assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + + // update ACL: george RW + List<TablePermission> acl = new ArrayList<>(1); + acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ, + TablePermission.Action.WRITE)); + final long mtimeB = AUTH_B.getMTime(); + AUTH_A.setTableUserPermissions(george.getShortName(), TEST_TABLE, acl); + // Wait for the update to propagate + UTIL.waitFor(10000, 100, new Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return AUTH_B.getMTime() > mtimeB; + } + }); + Thread.sleep(1000); + + // check it + assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + + // update ACL: hubert R + acl = new ArrayList<>(1); + acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ)); + final long mtimeA = AUTH_A.getMTime(); + AUTH_B.setTableUserPermissions("hubert", TEST_TABLE, acl); + // Wait for the update to propagate + UTIL.waitFor(10000, 100, new Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return AUTH_A.getMTime() > mtimeA; + } + }); + Thread.sleep(1000); + + // check it + assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.READ)); + assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertTrue(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + assertTrue(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.READ)); + assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, + TablePermission.Action.WRITE)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d1594789/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java deleted file mode 100644 index cb36246..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security.access; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.Waiter.Predicate; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.SecurityTests; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test the reading and writing of access permissions to and from zookeeper. - */ -@Category({SecurityTests.class, LargeTests.class}) -public class TestZKPermissionsWatcher { - private static final Log LOG = LogFactory.getLog(TestZKPermissionsWatcher.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static TableAuthManager AUTH_A; - private static TableAuthManager AUTH_B; - private final static Abortable ABORTABLE = new Abortable() { - private final AtomicBoolean abort = new AtomicBoolean(false); - - @Override - public void abort(String why, Throwable e) { - LOG.info(why, e); - abort.set(true); - } - - @Override - public boolean isAborted() { - return abort.get(); - } - }; - - private static TableName TEST_TABLE = - TableName.valueOf("perms_test"); - - @BeforeClass - public static void beforeClass() throws Exception { - // setup configuration - Configuration conf = UTIL.getConfiguration(); - SecureTestUtil.enableSecurity(conf); - - // start minicluster - UTIL.startMiniCluster(); - AUTH_A = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, - "TestZKPermissionsWatcher_1", ABORTABLE), conf); - AUTH_B = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, - "TestZKPermissionsWatcher_2", ABORTABLE), conf); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Test - public void testPermissionsWatcher() throws Exception { - Configuration conf = UTIL.getConfiguration(); - User george = User.createUserForTesting(conf, "george", new String[] { }); - User hubert = User.createUserForTesting(conf, "hubert", new String[] { }); - - assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - - assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - - // update ACL: george RW - List<TablePermission> acl = new ArrayList<>(1); - acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ, - TablePermission.Action.WRITE)); - final long mtimeB = AUTH_B.getMTime(); - AUTH_A.setTableUserPermissions(george.getShortName(), TEST_TABLE, acl); - // Wait for the update to propagate - UTIL.waitFor(10000, 100, new Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return AUTH_B.getMTime() > mtimeB; - } - }); - Thread.sleep(1000); - - // check it - assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - - // update ACL: hubert R - acl = new ArrayList<>(1); - acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ)); - final long mtimeA = AUTH_A.getMTime(); - AUTH_B.setTableUserPermissions("hubert", TEST_TABLE, acl); - // Wait for the update to propagate - UTIL.waitFor(10000, 100, new Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return AUTH_A.getMTime() > mtimeA; - } - }); - Thread.sleep(1000); - - // check it - assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.READ)); - assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertTrue(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - assertTrue(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.READ)); - assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null, - TablePermission.Action.WRITE)); - } -}
