Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign cf8a16f2a -> 6125ac9bd
SENTRY-1583: Refactor ZK/Curator code (Alexander Kolbasov via Vamsee Yarlagadda, Reviewed by: Misha Dmitriev, Vamsee Yarlagadda) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/6125ac9b Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/6125ac9b Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/6125ac9b Branch: refs/heads/sentry-ha-redesign Commit: 6125ac9bd541cb141beafa844fc7e32157c532d0 Parents: cf8a16f Author: Vamsee Yarlagadda <vam...@cloudera.com> Authored: Fri Jan 6 20:44:34 2017 -0800 Committer: Vamsee Yarlagadda <vam...@cloudera.com> Committed: Fri Jan 6 20:44:43 2017 -0800 ---------------------------------------------------------------------- pom.xml | 2 +- .../core/common/utils/SentryConstants.java | 1 - .../db/service/persistent/HAContext.java | 270 +++++++++++++++++++ .../db/service/thrift/SentryMetrics.java | 2 +- .../apache/sentry/service/thrift/Activator.java | 151 ----------- .../sentry/service/thrift/Activators.java | 75 ------ .../sentry/service/thrift/LeaderStatus.java | 181 ------------- .../service/thrift/LeaderStatusAdaptor.java | 207 -------------- .../service/thrift/LeaderStatusMonitor.java | 270 +++++++++++++++++++ .../sentry/service/thrift/SentryService.java | 85 +++--- .../sentry/service/thrift/ServiceConstants.java | 6 +- .../db/service/thrift/TestActivator.java | 8 - .../thrift/TestSentryServiceMetrics.java | 1 - .../sentry/service/thrift/TestLeaderStatus.java | 245 ----------------- 14 files changed, 587 insertions(+), 917 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index df26edf..b9282e7 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,7 @@ limitations under the License. <commons-pool2.version>2.2</commons-pool2.version> <commons.lang.version>2.6</commons.lang.version> <commons.logging.version>1.2</commons.logging.version> - <curator.version>2.7.1</curator.version> + <curator.version>2.11.1</curator.version> <datanucleus.maven.plugin.version>4.0.1</datanucleus.maven.plugin.version> <derby.version>10.10.2.0</derby.version> <easymock.version>3.0</easymock.version> http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java index c094058..4ed1361 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java @@ -42,5 +42,4 @@ public class SentryConstants { public static final String ACCESS_ALLOW_URI_PER_DB_POLICYFILE = "sentry.allow.uri.db.policyfile"; public static final String SENTRY_ZK_JAAS_NAME = "Sentry"; - public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key"; } http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java new file mode 100644 index 0000000..00eec4e --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.imps.DefaultACLProvider; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.sentry.service.thrift.JaasConfiguration; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.*; + +/** + * HAContext stores the global ZooKeeper related context. + * <p> + * This class is a singleton - only one ZooKeeper context is maintained. + */ +public final class HAContext implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class); + private static HAContext serverHAContext = null; + private static boolean aclUnChecked = true; + + private static final String SENTRY_ZK_JAAS_NAME = "SentryClient"; + private final String zookeeperQuorum; + private final String namespace; + + private final boolean zkSecure; + private final List<ACL> saslACL; + + private final CuratorFramework curatorFramework; + + private HAContext(Configuration conf) throws IOException { + this.zookeeperQuorum = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, ""); + int retriesMaxCount = conf.getInt(SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT, + SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT); + int sleepMsBetweenRetries = conf.getInt(SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS, + SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT); + this.namespace = conf.get(SENTRY_HA_ZOOKEEPER_NAMESPACE, + SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT); + this.zkSecure = conf.getBoolean(SENTRY_HA_ZOOKEEPER_SECURITY, + SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT); + this.validateConf(); + ACLProvider aclProvider; + if (zkSecure) { + LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); + this.setJaasConfiguration(conf); + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, + SENTRY_ZK_JAAS_NAME); + saslACL = Lists.newArrayList(); + saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf, + PRINCIPAL)))); + saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf, + SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL)))); + aclProvider = new SASLOwnerACLProvider(); + String allowConnect = conf.get(ALLOW_CONNECT); + + if (!Strings.isNullOrEmpty(allowConnect)) { + for (String principal : allowConnect.split("\\s*,\\s*")) { + LOGGER.info("Adding acls for " + principal); + saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal))); + } + } + } else { + saslACL = null; + LOGGER.info("Connecting to ZooKeeper without authentication"); + aclProvider = new DefaultACLProvider(); + } + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepMsBetweenRetries, retriesMaxCount); + this.curatorFramework = CuratorFrameworkFactory.builder() + .namespace(this.namespace) + .connectString(this.zookeeperQuorum) + .retryPolicy(retryPolicy) + .aclProvider(aclProvider) + .build(); + } + + private void start() { + if (curatorFramework.getState() != CuratorFrameworkState.STARTED) { + curatorFramework.start(); + } + } + + /** + * Create a singleton instance of ZooKeeper context (if needed) and return it. + * The instance returned is already running. + * + * @param conf Configuration, The following keys are used: + * <ul> + * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li> + * <li>SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT</li> + * <li>SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS</li> + * <li>SENTRY_HA_ZOOKEEPER_NAMESPACE</li> + * <li>SENTRY_HA_ZOOKEEPER_SECURITY</li> + * <li>LOGIN_CONTEXT_NAME_KEY</li> + * <li>PRINCIPAL</li> + * <li>SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL</li> + * <li>ALLOW_CONNECT</li> + * <li>SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE</li> + * <li>SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB</li> + * <li>RPC_ADDRESS</li> + * </ul> + * @return Global ZooKeeper context. + * @throws Exception + */ + static synchronized HAContext getHAContext(Configuration conf) throws IOException { + if (serverHAContext != null) { + return serverHAContext; + } + serverHAContext = new HAContext(conf); + + serverHAContext.start(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOGGER.info("ShutdownHook closing curator framework"); + try { + serverHAContext.close(); + } catch (Throwable t) { + LOGGER.error("Error stopping curator framework", t); + } + } + }); + return serverHAContext; + } + + /** + * HA context for server which verifies the ZK ACLs on namespace + * + * @param conf Configuration - see {@link #getHAContext(Configuration)} + * @return Server ZK context + * @throws Exception + */ + public static HAContext getHAServerContext(Configuration conf) throws Exception { + HAContext serverContext = getHAContext(conf); + serverContext.checkAndSetACLs(); + return serverContext; + } + + private void validateConf() { + checkNotNull(zookeeperQuorum, "Zookeeper Quorum should not be null."); + checkNotNull(namespace, "Zookeeper namespace should not be null."); + } + + private static String getServicePrincipal(Configuration conf, String confProperty) + throws IOException { + String principal = checkNotNull(conf.get(confProperty)); + checkArgument(!principal.isEmpty(), "Server principal is empty."); + return principal.split("[/@]")[0]; + } + + private void checkAndSetACLs() throws Exception { + if (zkSecure && aclUnChecked) { + // If znodes were previously created without security enabled, and now it is, we need to go + // through all existing znodes and set the ACLs for them. This is done just once at the startup + // We can't get the namespace znode through curator; have to go through zk client + String newNamespace = "/" + curatorFramework.getNamespace(); + if (curatorFramework.getZookeeperClient().getZooKeeper().exists(newNamespace, null) != null) { + List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(newNamespace, new Stat()); + if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) { + LOGGER.info("'sasl' ACLs not set; setting..."); + List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(newNamespace, + null); + for (String child : children) { + this.checkAndSetACLs("/" + child); + } + curatorFramework.getZookeeperClient().getZooKeeper().setACL(newNamespace, saslACL, -1); + } + } + aclUnChecked = false; + } + } + + private void checkAndSetACLs(String path) throws Exception { + LOGGER.info("Setting acls on " + path); + List<String> children = curatorFramework.getChildren().forPath(path); + for (String child : children) { + this.checkAndSetACLs(path + "/" + child); + } + curatorFramework.setACL().withACL(saslACL).forPath(path); + } + + // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() + private void setJaasConfiguration(Configuration conf) throws IOException { + if ("false".equalsIgnoreCase(conf.get( + SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE, + SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT))) { + String keytabFile = conf.get(SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB); + checkArgument(!keytabFile.isEmpty(), "Keytab File is empty."); + String principal = conf.get(SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL); + principal = SecurityUtil.getServerPrincipal(principal, + conf.get(RPC_ADDRESS, RPC_ADDRESS_DEFAULT)); + checkArgument(!principal.isEmpty(), "Kerberos principal is empty."); + + // This is equivalent to writing a jaas.conf file and setting the system property, + // "java.security.auth.login.config", to point to it (but this way we don't have to write + // a file, and it works better for the tests) + JaasConfiguration.addEntryForKeytab(SENTRY_ZK_JAAS_NAME, principal, keytabFile); + } else { + // Create jaas conf for ticket cache + JaasConfiguration.addEntryForTicketCache(SENTRY_ZK_JAAS_NAME); + } + javax.security.auth.login.Configuration.setConfiguration(JaasConfiguration.getInstance()); + } + + /** + * Create a new Curator leader szselector + * @param path Zookeeper path + * @param listener Curator listener for leader selection changes + * @return an instance of leader selector associated with the running curator framework + */ + public LeaderSelector newLeaderSelector(String path, LeaderSelectorListener listener) { + return new LeaderSelector(this.curatorFramework, path, listener); + } + + @Override + public void close() throws Exception { + this.curatorFramework.close(); + } + + private class SASLOwnerACLProvider implements ACLProvider { + @Override + public List<ACL> getDefaultAcl() { + return saslACL; + } + + @Override + public List<ACL> getAclForPath(String path) { + return saslACL; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java index f80605c..39539b9 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java @@ -134,7 +134,7 @@ public final class SentryMetrics { public void addSentryServiceGauges(SentryService sentryservice) { if(!sentryServiceGaugesAdded) { addGauge(SentryService.class, "is_active", sentryservice.getIsActiveGauge()); - addGauge(SentryService.class, "is_ha", sentryservice.getIsHAGauge()); + addGauge(SentryService.class, "activated", sentryservice.getBecomeActiveCount()); sentryServiceGaugesAdded = true; } } http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java deleted file mode 100644 index c3df4d8..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Properties; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryStandbyException; -import org.apache.sentry.provider.db.service.persistent.Fencer; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jdo.JDOHelper; -import javax.jdo.PersistenceManager; -import javax.jdo.PersistenceManagerFactory; - -/** - * The activator is used to access and modify the activation state of the sentry daemon. - * In active / active mode, only one daemon can fetch snapshots from HMS and write to the - * the backend DB, thus we can use the Activator to mark it. - */ -public class Activator implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(Activator.class); - - /** - * The DataNucleus PersistenceManagerFactory to use. - */ - private final PersistenceManagerFactory pmf; - - /** - * The handler for LeaderStatus callbacks. - */ - private final TransitionHandler handler; - - /** - * LeaderStatus generates callbacks to let us know when we are active or - * standby. When HA is enabled, it manages ZK sessions. - */ - private final LeaderStatus leaderStatus; - - /** - * The fencer object. - */ - private final Fencer fencer; - - /** - * True if the Activator is active. - * - * This variable can be read without taking the lock, but must not be - * written unless we hold the Activator lock. - */ - private volatile boolean active = false; - - public Activator(Configuration conf) throws Exception { - Properties props = SentryStore.getDataNucleusProperties(conf); - this.pmf = JDOHelper.getPersistenceManagerFactory(props); - this.handler = new TransitionHandler(); - this.leaderStatus = new LeaderStatus(handler, conf); - this.fencer = new Fencer(this.leaderStatus.getIncarnationId(), pmf); - this.leaderStatus.start(); - } - - @Override - public void close() throws IOException { - this.leaderStatus.close(); - this.pmf.close(); - } - - /** - * Deactivates this activator. - */ - @VisibleForTesting - public void deactivate() throws IOException { - leaderStatus.becomeStandby(); - } - - private class TransitionHandler implements LeaderStatus.Listener { - @Override - public void becomeActive() throws Exception { - synchronized (Activator.this) { - if (!active) { - LOGGER.info("Activating " + leaderStatus.getIncarnationId()); - fencer.fence(pmf); - active = true; - } - } - } - - @Override - public void becomeStandby() { - synchronized (Activator.this) { - if (active) { - LOGGER.info("Deactivating " + leaderStatus.getIncarnationId()); - active = false; - } - } - } - } - - /** - * Returns true if this Activator considers itself active. - * Note that you must still use checkSqlFencing or another - * means of fencing when performing modification operations. - */ - public boolean isActive() { - return active; - } - - public synchronized String getIncarnationId() { - return leaderStatus.getIncarnationId(); - } - - public Fencer getFencer() { - return fencer; - } - - /** - * Verify that the current SQL transaction is safe. - */ - public void checkSqlFencing(PersistenceManager pm) - throws SentryStandbyException { - // Before invoking the fencer, first check if we believe that we are - // active. This avoids wasted effort. - if (!active) { - throw new SentryStandbyException("The daemon is not active"); - } - // If we believe that we are active, add a query to the current transaction - // which will confirm that fact. - fencer.checkSqlFencing(pm); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java deleted file mode 100644 index 2926eeb..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import java.util.HashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.utils.SentryConstants; - -/** - * A global map from incarnation IDs to Activator objects.<p/> - * - * This is used to access the current global Activator. Normally there will - * only be one Activator used in a sentry process. There may be multiple - * Activator objects in the case where we are running unit tests. - */ -public enum Activators { - INSTANCE; - - private final HashMap<String, Activator> acts = new HashMap<String, Activator>(); - - Activators() {} - - public synchronized void put(Activator act) { - acts.put(act.getIncarnationId(), act); - } - - public Activator create(Configuration conf) throws Exception { - Activator act = new Activator(conf); - put(act); - return act; - } - - public Activator get(Configuration conf) { - String key = conf.get(SentryConstants.CURRENT_INCARNATION_ID_KEY); - if (key == null) { - throw new RuntimeException("No " + - SentryConstants.CURRENT_INCARNATION_ID_KEY + "set."); - } - return get(key); - } - - private synchronized Activator get(String incarnationId) { - Activator act = acts.get(incarnationId); - if (act == null) { - throw new RuntimeException("No activator found with " + - "incarnationId " + incarnationId); - } - return act; - } - - public synchronized void remove(Activator act) { - Activator removed = acts.remove(act.getIncarnationId()); - if (removed == null) { - throw new RuntimeException("No activator found with " + - "incarnationId " + act.getIncarnationId()); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java deleted file mode 100644 index 2467921..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.datanucleus.util.Base64; - -import java.io.Closeable; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ENABLED; -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ENABLED_DEFAULT; - -/** - * Determines the leadership status of the Sentry daemon. - * It handles both highly-available and non-highly-available configurations. - */ -final class LeaderStatus implements Closeable { - private static final Log LOG = LogFactory.getLog(LeaderStatus.class); - - /** - * Callback functions which are invoked when the leader status changes. - */ - interface Listener { - /** - * Attempt to become the leader. - * - * @throws Exception On error. If an exception is thrown, we will - * relinquish leadership. - */ - void becomeActive() throws Exception; - - /** - * Become the standby. All exceptions thrown from this function will - * be ignored. - */ - void becomeStandby(); - } - - /** - * The listener to invoke when our leadership status changes. - */ - private final Listener listener; - - /** - * The unique ID of this potential leader. - */ - private final String incarnationId; - - /** - * The LeaderStatusThread, or null if HA is disabled. - */ - private final LeaderStatusAdaptor leaderStatusAdaptor; - - /** - * True if this object has been closed.<p/> - * - * This is an AtomicBoolean so that multiple calls to close only result in one - * close action. - */ - private final AtomicBoolean closed = new AtomicBoolean(false); - - /** - * Generate a very long random ID. - * - * We want a name that doesn't start with a number, and which - * contains only letters and numbers. This is important because - * the incarnation ID gets used in SQL databases to name a table. - */ - static String generateIncarnationId() { - SecureRandom srand = new SecureRandom(); - // Why 12? Base64 encodes 12 bytes to a 16 length char array(12 * 8 /6). - // We need a encoded string of length <= 17, as the length of a - // fencing table which is prefixed with SENTRY_FENCE_(13 chars) - // cannot be greater than 30 chars(Oracle limitation) - byte[] buf = new byte[12]; - srand.nextBytes(buf); - char[] cbuf = Base64.encode(buf); - StringBuilder bld = new StringBuilder(); - for (int i = 0; i < cbuf.length; i++) { - boolean safe; - if (i == 0) { - // Some databases can't handle identiifers that start with numbers, - // so always start with a letter. Also replace '+' or '/' with - // something safe. - safe = Character.isLetter(cbuf[i]); - } else { - // Replace '+' or '/' with something safe. - safe = Character.isLetterOrDigit(cbuf[i]); - } - if (!safe) { - bld.append((char)('a' + srand.nextInt(26))); - } else { - bld.append(cbuf[i]); - } - } - return bld.toString(); - } - - LeaderStatus(Listener listener, Configuration conf) throws Exception { - this.listener = listener; - this.incarnationId = generateIncarnationId(); - boolean isHa = conf. - getBoolean(SENTRY_HA_ENABLED, SENTRY_HA_ENABLED_DEFAULT); - if (isHa) { - this.leaderStatusAdaptor = new LeaderStatusAdaptor(incarnationId, conf, listener); - } else { - LOG.info("LeaderStatus(incarnationId=" + incarnationId + - "): HA is disabled."); - this.leaderStatusAdaptor = null; - } - } - - public String getIncarnationId() { - return incarnationId; - } - - public void start() throws Exception { - if (this.leaderStatusAdaptor != null) { - this.leaderStatusAdaptor.start(); - } else { - this.listener.becomeActive(); - } - } - - /** - * Called by tests to force deactivate(standby) a daemon, - * so that another daemon becomes active. - * @throws IOException - */ - @VisibleForTesting - public void becomeStandby() throws IOException { - if (leaderStatusAdaptor != null) { - leaderStatusAdaptor.deactivate(); - } else { - try { - listener.becomeStandby(); - } catch (Throwable t) { - LOG.error("becomeStandby: " + incarnationId + - " threw an unexpected exception", t); - } - } - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (leaderStatusAdaptor != null) { - // Shut down in the HA case. - leaderStatusAdaptor.close(); - } else { - // Shut down in the non-HA case. - try { - listener.becomeStandby(); - } catch (Throwable t) { - LOG.error("becomeStandby: " + incarnationId + - " threw an unexpected exception", t); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java deleted file mode 100644 index e87b3d1..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM; -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT; -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE; -import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT; - -/** - * Determines the leadership status of the Sentry daemon. - */ -final class LeaderStatusAdaptor - extends LeaderSelectorListenerAdapter implements Closeable { - private static final Log LOG = - LogFactory.getLog(LeaderStatusAdaptor.class); - - private final String LEADER_SELECTOR_SUFFIX = "leader"; - - /** - * The ZooKeeper path prefix to use. - */ - private final String zkNamespace; - - /** - * The Curator framework object. - */ - private final CuratorFramework framework; - - /** - * The listener which we should notify about HA state changes. - */ - private final LeaderStatus.Listener listener; - - /** - * The Curator LeaderSelector object. - */ - private final LeaderSelector leaderSelector; - - /** - * The lock which protects isActive. - */ - private final ReentrantLock lock = new ReentrantLock(); - - /** - * A condition variable which the takeLeadership function will wait on. - */ - private final Condition cond = lock.newCondition(); - - /** - * The number of times this incarnation has become the leader. - */ - private long becomeLeaderCount = 0; - - /** - * True only if this LeaderStatusAdaptor is closed. - */ - private boolean isClosed = false; - - /** - * True only if this incarnation is currently active. - */ - private boolean isActive = false; - - LeaderStatusAdaptor(String incarnationId, Configuration conf, - LeaderStatus.Listener listener) { - this.zkNamespace = conf.get(SENTRY_HA_ZOOKEEPER_NAMESPACE, - SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT); - String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, - SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT); - if ((zkServers == null) || (zkServers.trim().isEmpty())) { - throw new RuntimeException("You must configure some ZooKeeper " + - "servers via " + SENTRY_HA_ZOOKEEPER_QUORUM + " when enabling HA"); - } - this.framework = CuratorFrameworkFactory.newClient(zkServers, - new ExponentialBackoffRetry(1000, 3)); - this.framework.start(); - this.listener = listener; - this.leaderSelector = new LeaderSelector(this.framework, - this.zkNamespace + "/" + LEADER_SELECTOR_SUFFIX, this); - this.leaderSelector.setId(incarnationId); - this.leaderSelector.autoRequeue(); - LOG.info("Created LeaderStatusAdaptor(zkNamespace=" + zkNamespace + - ", incarnationId=" + incarnationId + - ", zkServers='" + zkServers + "')"); - } - - public void start() { - this.leaderSelector.start(); - } - - /** - * Shut down the LeaderStatusAdaptor and wait for it to transition to - * standby. - */ - @Override - public void close() throws IOException { - // If the adaptor is already closed, calling close again is a no-op. - // Setting isClosed also prevents activation after this point. - lock.lock(); - try { - if (isClosed) { - return; - } - isClosed = true; - } finally { - lock.unlock(); - } - - // Shut down our Curator hooks. - leaderSelector.close(); - - // Wait for the adaptor to transition to standby state. - lock.lock(); - try { - while (isActive) { - cond.awaitUninterruptibly(); - } - } finally { - lock.unlock(); - } - } - - /** - * @return true if this client is the current leader. - */ - public boolean isActive() { - lock.lock(); - try { - return isActive; - } finally { - lock.unlock(); - } - } - - /** - * Deactivate the current client, if it is active. - */ - public void deactivate() { - lock.lock(); - try { - if (isActive) { - isActive = false; - cond.signal(); - } - } finally { - lock.unlock(); - } - } - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - lock.lock(); - try { - if (isClosed) { - LOG.info("LeaderStatusAdaptor: can't become active because the " + - "adaptor is closed."); - return; - } - isActive = true; - becomeLeaderCount++; - LOG.info("LeaderStatusAdaptor: becoming active. " + - "becomeLeaderCount=" + becomeLeaderCount); - listener.becomeActive(); - while (isActive) { - cond.await(); - } - } finally { - deactivate(); - LOG.info("LeaderStatusAdaptor: becoming standby"); - try { - listener.becomeStandby(); - } catch (Throwable t) { - LOG.error("becomeStandby threw unexpected exception", t); - } - lock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java new file mode 100644 index 0000000..d1999b2 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusMonitor.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.persistent.HAContext; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.sentry.service.thrift.ServiceConstants.ServerConfig.*; + +/** + * LeaderStatusMonitor participates in the distributed leader election protocol + * and allows clients to access the global leaadership status. + * <p> + * LeaderStatusMonitor is a singleton that uses Curator framework via + * {@link HAContext}.The leadership status can be accessed via the + * {@link #isLeader()} method.<p> + * + * Usually leadership re-election is initiated by the Curator framework when one + * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also supports + * voluntary release of the leadership via the {@link #deactivate()} method. This is + * intended to be used for debugging purposes. + * <p> + * The class also simulates leader election in non-HA environments. In such cases its + * {@link #isLeader()} method always returns True. The non-HA environment is determined + * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration. + * + * <h2>Implementation notes</h2> + * + * <h3>Initialization</h3> + * + * Class initialization is split between the constructor and the {@link #init()} method. + * There are two reasons for it: + * <ul> + * <li>We do not want to pass <strong>this</strong> reference to + * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} + * until it is fully initialized</li> + * <li>We do not want to call {@link LeaderSelector#start()} method in constructor</li> + * </ul> + * + * Since LeaderStatusMonitor is a singleton and an instance can only be obtained via the + * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this construction split + * from the callers. + * + * <h3>Synchronization</h3> + * Singleton synchronization is achieved using the synchronized class builder + * {@link #getLeaderStatusMonitor(Configuration)} + * <p> + * Upon becoming a leader, the code loops in {@link #takeLeadership(CuratorFramework)} + * until it receives a deactivation signal from {@link #deactivate()}. This is synchronized + * using a {@link #lock} and condition variable {@link #cond}. + * <p> + * Access to the leadership status {@link #isLeader} is also protected by the {@link #lock}. + * This isn't strictly necessary and a volatile field would be sufficient, but since we + * already use the {@link #lock} this is more straightforward. + */ +@ThreadSafe +final class LeaderStatusMonitor + extends LeaderSelectorListenerAdapter implements AutoCloseable { + + private static final Log LOG = + LogFactory.getLog(LeaderStatusMonitor.class); + + private static final String LEADER_SELECTOR_SUFFIX = "leader"; + + /** Unique instance of the singleton object */ + private static LeaderStatusMonitor leaderStatusMonitor = null; + + private final String zkNamespace; + private final HAContext haContext; + + /** Unique string describing this instance */ + private final String incarnationId = generateIncarnationId(); + + /** True when not using ZooKeeeper */ + private final boolean isSingleNodeMode; + + /** Lock and condition used to signal the leader to voluntary release leadership */ + private final Lock lock = new ReentrantLock(); + /** Condition variable used to synchronize voluntary leadership release */ + private final Condition cond = lock.newCondition(); + /** Leadership status - true if leader. */ + @GuardedBy("lock") + private boolean isLeader = false; + + /** Curator framework leader monitor */ + private LeaderSelector leaderSelector = null; + + /** The number of times this incarnation has become the leader. */ + private final AtomicLong leaderCount = new AtomicLong(0); + + /** + * Constructor. Initialize state and create HA context if configuration + * specifies ZooKeeper servers. + * @param conf Configuration. The fields we are interested in are: + * <ul> + * <li>SENTRY_HA_ZOOKEEPER_NAMESPACE</li> + * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li> + * </ul> + * Configuration is also passed to the + * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} + * which uses more properties. + * @throws Exception + */ + private LeaderStatusMonitor(Configuration conf) throws Exception { + // Only enable HA configuration if zookeeper is configured + zkNamespace = conf.get(SENTRY_HA_ZOOKEEPER_NAMESPACE, ""); + String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, ""); + if (zkServers.isEmpty()) { + isSingleNodeMode = true; + haContext = null; + isLeader = true; + LOG.info("Leader election protocol disabled, assuming single active server"); + return; + } + isSingleNodeMode = false; + haContext = HAContext.getHAServerContext(conf); + + LOG.info("Created LeaderStatusMonitor(zkNamespace=" + zkNamespace + + ", incarnationId=" + incarnationId + + ", zkServers='" + zkServers + "')"); + } + + /** + * Second half of the constructor links this object with {@link HAContext} and + * starts leader election protocol. + */ + private void init() { + if (isSingleNodeMode) { + return; + } + + leaderSelector = haContext.newLeaderSelector(zkNamespace + "/" + + LEADER_SELECTOR_SUFFIX, this); + leaderSelector.setId(incarnationId); + leaderSelector.autoRequeue(); + leaderSelector.start(); + } + + /** + * + * @param conf Configuration. See {@link #LeaderStatusMonitor(Configuration)} for details. + * @return A global LeaderStatusMonitor instance. + * @throws Exception + */ + @SuppressWarnings("LawOfDemeter") + static synchronized LeaderStatusMonitor getLeaderStatusMonitor(Configuration conf) + throws Exception { + if (leaderStatusMonitor == null) { + leaderStatusMonitor = new LeaderStatusMonitor(conf); + leaderStatusMonitor.init(); + } + return leaderStatusMonitor; + } + + /** + * @return number of times this leader was elected. Used for metrics. + */ + long getLeaderCount() { + return leaderCount.get(); + } + + /** + * Shut down the LeaderStatusMonitor and wait for it to transition to + * standby. + */ + @Override + public void close() { + if (leaderSelector != null) { + // Shut down our Curator hooks. + leaderSelector.close(); + } + } + + /** + * Deactivate the current client, if it is active. + * In non-HA case this is a no-op. + */ + void deactivate() { + if (isSingleNodeMode) { + return; + } + lock.lock(); + try { + cond.signal(); + } finally { + lock.unlock(); + } + } + + /** + * @return true iff we are the leader. + * In non-HA case always returns true + */ + boolean isLeader() { + if (isSingleNodeMode) { + return true; + } + lock.lock(); + @SuppressWarnings("FieldAccessNotGuarded") + boolean leader = isLeader; + lock.unlock(); + return leader; + } + + /** + * Curator framework callback which is called when we become a leader. + * Should return only when we decide to resign. + */ + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + leaderCount.incrementAndGet(); + LOG.info("LeaderStatusMonitor: becoming active. " + + "leaderCount=" + leaderCount); + lock.lock(); + try { + isLeader = true; + // Wait until we are interrupted or receive a signal + cond.await(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + LOG.info("LeaderStatusMonitor: interrupted"); + } finally { + isLeader = false; + lock.unlock(); + LOG.info("LeaderStatusMonitor: becoming standby"); + } + } + + /** + * Generate ID for the activator. <p> + * + * Ideally we would like something like host@pid, but Java doesn't provide a good + * way to determine pid value, so we use + * {@link RuntimeMXBean#getName()} which usually contains host + * name and pid. + */ + private static String generateIncarnationId() { + return ManagementFactory.getRuntimeMXBean().getName(); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index a9d49f8..bd4ada4 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -45,7 +45,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.sentry.Command; -import org.apache.sentry.core.common.utils.SentryConstants; +import org.apache.sentry.core.common.utils.SigUtils; import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener; import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import org.apache.sentry.provider.db.service.thrift.SentryMetricsServletContextListener; @@ -67,20 +67,22 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -public class SentryService implements Callable { +import static org.apache.sentry.core.common.utils.SigUtils.registerSigListener; - private static final Logger LOGGER = LoggerFactory - .getLogger(SentryService.class); +public class SentryService implements Callable, SigUtils.SigListener { - private static enum Status { - NOT_STARTED(), STARTED(); + private static final Logger LOGGER = LoggerFactory.getLogger(SentryService.class); + + private enum Status { + NOT_STARTED, + STARTED, } private final Configuration conf; private final InetSocketAddress address; private final int maxThreads; private final int minThreads; - private boolean kerberos; + private final boolean kerberos; private final String principal; private final String[] principalParts; private final String keytab; @@ -89,12 +91,10 @@ public class SentryService implements Callable { private Future serviceStatus; private TServer thriftServer; private Status status; - private int webServerPort; + private final int webServerPort; private SentryWebServer sentryWebServer; - private long maxMessageSize; - private final boolean isHA; - private final Activator act; - SentryMetrics sentryMetrics; + private final long maxMessageSize; + private final LeaderStatusMonitor leaderMonitor; public SentryService(Configuration conf) throws Exception { this.conf = conf; @@ -139,8 +139,6 @@ public class SentryService implements Callable { principalParts = null; keytab = null; } - isHA = conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, - ServerConfig.SENTRY_HA_ENABLED_DEFAULT); serviceExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { private int count = 0; @@ -150,19 +148,28 @@ public class SentryService implements Callable { + (count++)); } }); - this.act = Activators.INSTANCE.create(conf); - conf.set(SentryConstants.CURRENT_INCARNATION_ID_KEY, - this.act.getIncarnationId()); + this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf); webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT); - //TODO: Enable only if Hive is using Sentry? try { hmsFollowerExecutor = Executors.newScheduledThreadPool(1); - hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf), 60000, 500, TimeUnit.MILLISECONDS); - }catch(Exception e) { + hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf), + 60000, 500, TimeUnit.MILLISECONDS); + } catch(Exception e) { //TODO: Handle LOGGER.error("Could not start HMSFollower"); } status = Status.NOT_STARTED; + + // Enable signal handler for HA leader/follower status if configured + String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG); + if ((sigName != null) && !sigName.isEmpty()) { + LOGGER.info("Registering signal handler " + sigName + " for HA"); + try { + registerSigListener(sigName, this); + } catch (Exception e) { + LOGGER.error("Failed to register signal", e); + } + } } @Override @@ -251,8 +258,7 @@ public class SentryService implements Callable { } private void addSentryServiceGauge() { - sentryMetrics = SentryMetrics.getInstance(); - sentryMetrics.addSentryServiceGauges(this); + SentryMetrics.getInstance().addSentryServiceGauges(this); } private void startSentryWebServer() throws Exception{ @@ -295,7 +301,7 @@ public class SentryService implements Callable { public synchronized void stop() throws Exception{ MultiException exception = null; LOGGER.info("Attempting to stop..."); - act.close(); + leaderMonitor.close(); if (isRunning()) { LOGGER.info("Attempting to stop sentry thrift service..."); try { @@ -337,16 +343,8 @@ public class SentryService implements Callable { * to the backend DB. */ @VisibleForTesting - public synchronized void becomeStandby() throws Exception{ - try { - if(act.isActive()) { - LOGGER.info("Server with incarnation id: " + act.getIncarnationId() + - " becoming standby"); - act.deactivate(); - } - } catch (Exception e) { - LOGGER.error("Error while deactivating the active sentry daemon", e); - } + public synchronized void becomeStandby() { + leaderMonitor.deactivate(); } private MultiException addMultiException(MultiException exception, Exception e) { @@ -463,24 +461,27 @@ public class SentryService implements Callable { return thriftServer.getEventHandler(); } - @VisibleForTesting - public Activator getActivator() {return act;} - public Gauge<Boolean> getIsActiveGauge() { return new Gauge<Boolean>() { @Override public Boolean getValue() { - return act.isActive(); + return leaderMonitor.isLeader(); } }; } - public Gauge<Boolean> getIsHAGauge() { - return new Gauge<Boolean>() { + public Gauge<Long> getBecomeActiveCount() { + return new Gauge<Long>() { @Override - public Boolean getValue() { - return isHA; + public Long getValue() { + return leaderMonitor.getLeaderCount(); } }; } + + @Override + public void onSignal(String signalName) { + // Become follower + leaderMonitor.deactivate(); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index d80fa1e..c4fdf1d 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -124,18 +124,18 @@ public class ServiceConstants { public static final String SENTRY_HA_ZOOKEEPER_SECURITY = SENTRY_HA_ZK_PROPERTY_PREFIX + "security"; public static final boolean SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT = false; public static final String SENTRY_HA_ZOOKEEPER_QUORUM = SENTRY_HA_ZK_PROPERTY_PREFIX + "quorum"; - public static final String SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT = ""; public static final String SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT = SENTRY_HA_ZK_PROPERTY_PREFIX + "session.retries.max.count"; public static final int SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT = 3; public static final String SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS = SENTRY_HA_ZK_PROPERTY_PREFIX + "session.sleep.between.retries.ms"; public static final int SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT = 100; public static final String SENTRY_HA_ZOOKEEPER_NAMESPACE = SENTRY_HA_ZK_PROPERTY_PREFIX + "namespace"; - public static final String SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry"; + public static final String SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "sentry"; // principal and keytab for client to be able to connect to secure ZK. Needed for Sentry HA with secure ZK public static final String SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL = "sentry.zookeeper.client.principal"; public static final String SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB = "sentry.zookeeper.client.keytab"; public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE = "sentry.zookeeper.client.ticketcache"; public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT = "false"; + public static final String SERVER_HA_STANDBY_SIG = "sentry.ha.standby.signal"; public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS = ImmutableMap.<String, String>builder() .put("datanucleus.connectionPoolingType", "BoneCP") @@ -222,9 +222,7 @@ public class ServiceConstants { // HA configuration public static final String SENTRY_HA_ENABLED = "sentry.ha.enabled"; - public static final boolean SENTRY_HA_ENABLED_DEFAULT = ServerConfig.SENTRY_HA_ENABLED_DEFAULT; public static final String SENTRY_HA_ZOOKEEPER_QUORUM = ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM; - public static final String SERVER_HA_ZOOKEEPER_QUORUM_DEFAULT = ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT; public static final String SENTRY_HA_ZOOKEEPER_NAMESPACE = ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE; public static final String SERVER_HA_ZOOKEEPER_NAMESPACE_DEFAULT = ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT; http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestActivator.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestActivator.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestActivator.java index 60cfc73..5227c45 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestActivator.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestActivator.java @@ -37,14 +37,6 @@ public class TestActivator extends SentryServiceIntegrationBase { } - @Test - public void testStopActive() throws Exception { - Assert.assertEquals(Boolean.TRUE,server.getActivator().isActive()); - //stop the current service. - server.becomeStandby(); - Assert.assertEquals(Boolean.FALSE,server.getActivator().isActive()); - } - @Override @After public void after() { http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceMetrics.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceMetrics.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceMetrics.java index bc375e3..a33b03a 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceMetrics.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceMetrics.java @@ -56,7 +56,6 @@ public class TestSentryServiceMetrics extends SentryServiceIntegrationBase { //More Cases to be added once Sentry HA is implemented //Check for gauges with the server handle. - Assert.assertEquals(Boolean.FALSE,server.getIsHAGauge().getValue()); Assert.assertEquals(Boolean.TRUE,server.getIsActiveGauge().getValue()); } http://git-wip-us.apache.org/repos/asf/sentry/blob/6125ac9b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestLeaderStatus.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestLeaderStatus.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestLeaderStatus.java deleted file mode 100644 index c796fab..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestLeaderStatus.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.curator.test.TestingServer; -import org.apache.curator.utils.CloseableUtils; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; - -import java.io.Closeable; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Semaphore; - -import static org.apache.sentry.service.thrift.ServiceConstants.ClientConfig.SENTRY_HA_ENABLED; -import static org.apache.sentry.service.thrift.ServiceConstants.ClientConfig.SENTRY_HA_ZOOKEEPER_QUORUM; - -final public class TestLeaderStatus { - private static final Log LOG = - LogFactory.getLog(TestLeaderStatus.class); - - /** - * Test that when the configuration is non-HA, we always become active. - */ - @Test(timeout = 60000) - public void testNonHaLeaderStatus() throws Exception { - Configuration conf = new Configuration(); - conf.set(SENTRY_HA_ZOOKEEPER_QUORUM, ""); - final Semaphore activeSem = new Semaphore(0); - final Semaphore standbySem = new Semaphore(0); - LeaderStatus status = new LeaderStatus(new LeaderStatus.Listener() { - @Override - public void becomeActive() throws Exception { - LOG.info("testNonHaLeaderStatus: becoming active"); - activeSem.release(2); - } - - @Override - public void becomeStandby() { - activeSem.acquireUninterruptibly(); - LOG.info("testNonHaLeaderStatus: becoming standby"); - standbySem.release(); - } - }, conf); - status.start(); - activeSem.acquire(); - status.close(); - standbySem.acquire(); - } - - private static class CurrentTestActive { - private String incarnationId; - private String error = null; - - CurrentTestActive() { - this.incarnationId = null; - this.error = null; - } - - synchronized void set(String incarnationId) { - if (this.incarnationId != null) { - error("set: there is already an " + - "active incarnation " + this.incarnationId); - return; - } - this.incarnationId = incarnationId; - } - - synchronized void unset(String incarnationId) { - if (this.incarnationId == null) { - error("unset: there is no active incarnation."); - return; - } - if (!this.incarnationId.equals(incarnationId)) { - error("unset: can't deactivate " + - incarnationId + " because " + this.incarnationId + - " is the current active incarnation."); - return; - } - this.incarnationId = null; - } - - synchronized String get() { - return this.incarnationId; - } - - synchronized String getError() { - return error; - } - - synchronized void error(String error) { - if (this.error == null) { - this.error = error; - } - LOG.error(error); - } - - String busyWaitForActive() throws InterruptedException { - for (; ; ) { - String cur = get(); - if (cur != null) { - return cur; - } - Thread.sleep(2); - } - } - - String busyWaitForNextActive(String prevIncarnation) - throws InterruptedException { - for (; ; ) { - String cur = get(); - if ((cur != null) && (!cur.equals(prevIncarnation))) { - return cur; - } - Thread.sleep(2); - } - } - } - - static class LeaderStatusContext implements Closeable { - final LeaderStatus status; - - LeaderStatusContext(final CurrentTestActive active, - Configuration conf) throws Exception { - this.status = new LeaderStatus(new LeaderStatus.Listener() { - @Override - public void becomeActive() throws Exception { - LOG.info("LeaderStatusContext " + status.getIncarnationId() + - " becoming active"); - active.set(status.getIncarnationId()); - } - - @Override - public void becomeStandby() { - LOG.info("LeaderStatusContext " + status.getIncarnationId() + - " becoming standby"); - active.unset(status.getIncarnationId()); - } - }, conf); - this.status.start(); - } - - @Override - public void close() throws IOException { - this.status.close(); - } - - @Override - public String toString() { - return "LeaderStatusContext(" + status.getIncarnationId() + ")"; - } - - String getIncarnationId() { - return status.getIncarnationId(); - } - } - - @Test(timeout = 120000) - public void testRacingClients() throws Exception { - final int NUM_CLIENTS = 3; - final Configuration conf = new Configuration(); - TestingServer server = new TestingServer(); - server.start(); - conf.setBoolean(SENTRY_HA_ENABLED, true); - conf.set(SENTRY_HA_ZOOKEEPER_QUORUM, server.getConnectString()); - final CurrentTestActive active = new CurrentTestActive(); - List<LeaderStatusContext> contexts = new LinkedList<>(); - for (int i = 0; i < NUM_CLIENTS; i++) { - try { - contexts.add(new LeaderStatusContext(active, conf)); - } catch (Throwable t) { - LOG.error("error creating LeaderStatusContext", t); - throw new RuntimeException(t); - } - } - LOG.info("Created " + NUM_CLIENTS + " SentryLeaderSelectorClient " + - "objects."); - String curIncarnation = active.busyWaitForActive(); - LOG.info("Closing LeaderStatus(" + curIncarnation + ")."); - for (Iterator<LeaderStatusContext> iter = contexts.iterator(); - iter.hasNext(); ) { - LeaderStatusContext context = iter.next(); - if (context.getIncarnationId().equals(curIncarnation)) { - CloseableUtils.closeQuietly(context); - iter.remove(); - } - } - active.busyWaitForNextActive(curIncarnation); - for (Iterator<LeaderStatusContext> iter = contexts.iterator(); - iter.hasNext(); ) { - LeaderStatusContext context = iter.next(); - CloseableUtils.closeQuietly(context); - iter.remove(); - } - LOG.info("Closed all " + NUM_CLIENTS + " SentryLeaderSelectorClient " + - "objects."); - Assert.assertTrue(null == active.getError()); - server.close(); - } - - @Test(timeout = 60000) - public void testGenerateIncarnationIDs() throws Exception { - final int NUM_UNIQUE_IDS = 10000; - HashSet<String> ids = new HashSet<String>(); - for (int i = 0; i < NUM_UNIQUE_IDS; i++) { - ids.add(LeaderStatus.generateIncarnationId()); - } - - // Assert that there were no ID collisions - Assert.assertEquals(NUM_UNIQUE_IDS, ids.size()); - - // Assert that all IDs are 16 characters long and begin with a letter. - for (String id : ids) { - Assert.assertEquals(16, id.length()); - Assert.assertTrue(Character.isAlphabetic(id.charAt(0))); - } - - // Assert that IDs contain only alphanumeric characters - for (String id : ids) { - for (int i = 0; i < id.length(); i++) { - Assert.assertTrue(Character.isLetterOrDigit(id.charAt(i))); - } - } - } -}