SENTRY-1317: Implement fencing required for active/standby (Colin P. McCabe , Reviewed by: Hao Hao and Sravya Tirukkovalur)
New fencing and active/passive code - Activator to store the state about whether the daemon is active or not, as well as manage fencing - Create Fencer to implement SQL fencing. - Add SqlAccessor to talk directly to SQL databases LeaderStatus: generate shorter incarnation IDs by using base64. LeaderStatusAdaptor: implement close() Remove old code which is no longer used - HAContext - PluginCacheSyncUtil - TestHAUpdateForwarder - ServiceRegister SentryStore - move DataNucleus properties setup into a utility function - Remove unused DEFAULT_DATA_DIR variable (it's not used anywhere in the code) - SentryStore should maintain a reference to the Activator Add SentryStandbyException to indicate that the daemon is currently standby Move SENTRY_ZK_JAAS_NAME from HAContext to SentryConstants DelegateSentryStore: make some fields final Change-Id: Ic41711ecbc218bb21e3ca3120998866d65e16493 Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ff7823b6 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ff7823b6 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ff7823b6 Branch: refs/heads/sentry-ha-redesign Commit: ff7823b66f5fc6cb97b5b760ab091b217c38f7d4 Parents: 113c5ea Author: Sravya Tirukkovalur <[email protected]> Authored: Tue Jul 12 13:03:23 2016 -0700 Committer: Sravya Tirukkovalur <[email protected]> Committed: Tue Jul 12 13:03:23 2016 -0700 ---------------------------------------------------------------------- .../exception/SentryStandbyException.java | 33 ++ .../core/common/utils/SentryConstants.java | 3 + .../apache/sentry/hdfs/PluginCacheSyncUtil.java | 251 --------------- .../sentry/hdfs/SentryHdfsMetricsUtil.java | 8 - .../sentry/hdfs/TestHAUpdateForwarder.java | 66 ---- .../provider/db/service/persistent/Fencer.java | 242 +++++++++++++++ .../db/service/persistent/HAContext.java | 262 ---------------- .../db/service/persistent/SentryStore.java | 32 +- .../db/service/persistent/ServiceRegister.java | 52 ---- .../db/service/persistent/SqlAccessor.java | 309 +++++++++++++++++++ .../thrift/SentryPolicyStoreProcessor.java | 31 +- .../apache/sentry/service/thrift/Activator.java | 112 +++++++ .../sentry/service/thrift/Activators.java | 69 +++++ .../sentry/service/thrift/LeaderStatus.java | 31 +- .../service/thrift/LeaderStatusAdaptor.java | 41 ++- .../sentry/service/thrift/SentryService.java | 33 +- .../sentry/service/thrift/ServiceConstants.java | 4 + .../persistent/SentryStoreIntegrationBase.java | 15 + .../TestPrivilegeOperatePersistence.java | 22 +- .../db/service/persistent/TestSentryStore.java | 14 + .../persistent/TestSentryStoreImportExport.java | 16 +- .../service/persistent/TestSentryVersion.java | 18 ++ .../thrift/SentryServiceIntegrationBase.java | 3 +- .../sentry/service/thrift/TestLeaderStatus.java | 26 ++ 24 files changed, 982 insertions(+), 711 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java new file mode 100644 index 0000000..73c7e4e --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.exception; + +/** + * An exception which indicates that the current server is standby. + */ +public class SentryStandbyException extends SentryUserException { + private static final long serialVersionUID = 2162010615815L; + + public SentryStandbyException(String msg) { + super(msg); + } + + public SentryStandbyException(String msg, String reason) { + super(msg, reason); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java index 3da4906..c094058 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java @@ -40,4 +40,7 @@ public class SentryConstants { public static final String RESOURCE_WILDCARD_VALUE_ALL = "ALL"; public static final String RESOURCE_WILDCARD_VALUE_SOME = "+"; public static final String ACCESS_ALLOW_URI_PER_DB_POLICYFILE = "sentry.allow.uri.db.policyfile"; + + public static final String SENTRY_ZK_JAAS_NAME = "Sentry"; + public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key"; } http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java deleted file mode 100644 index 4ce16c7..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import com.codahale.metrics.Timer; -import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; -import org.apache.curator.utils.ZKPaths; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.sentry.hdfs.Updateable.Update; -import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException; -import org.apache.sentry.provider.db.service.persistent.HAContext; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Utility class for handling the cache update syncup via Curator path cache It - * creates the path cache, a distributed lock and counter. The updated API - * updates the counter, creates a znode zpath/counter and writes the data to it. - * The caller should provider the cache callback handler class that posts the - * update object to the required cache - */ -public class PluginCacheSyncUtil { - private static final Logger LOGGER = LoggerFactory - .getLogger(PluginCacheSyncUtil.class); - public static final long CACHE_GC_SIZE_THRESHOLD_HWM = 100; - public static final long CACHE_GC_SIZE_THRESHOLD_LWM = 50; - public static final long CACHE_GC_SIZE_MAX_CLEANUP = 1000; - public static final long ZK_COUNTER_INIT_VALUE = 4; - public static final long GC_COUNTER_INIT_VALUE = ZK_COUNTER_INIT_VALUE + 1; - - private final String zkPath; - private final HAContext haContext; - private final PathChildrenCache cache; - private InterProcessSemaphoreMutex updatorLock, gcLock; - private int lockTimeout; - private DistributedAtomicLong updateCounter, gcCounter; - private final ScheduledExecutorService gcSchedulerForZk = Executors - .newScheduledThreadPool(1); - - public PluginCacheSyncUtil(String zkPath, final Configuration conf, - PathChildrenCacheListener cacheListener) throws SentryPluginException { - this.zkPath = zkPath; - // Init ZK connection - try { - haContext = HAContext.getHAContext(conf); - } catch (Exception e) { - throw new SentryPluginException("Error creating HA context ", e); - } - haContext.startCuratorFramework(); - - // Init path cache - cache = new PathChildrenCache(haContext.getCuratorFramework(), zkPath - + "/cache", true); - // path cache callback - cache.getListenable().addListener(cacheListener); - try { - cache.start(); - } catch (Exception e) { - throw new SentryPluginException("Error creating ZK PathCache ", e); - } - updatorLock = new InterProcessSemaphoreMutex( - haContext.getCuratorFramework(), zkPath + "/lock"); - lockTimeout = conf.getInt( - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT); - gcLock = new InterProcessSemaphoreMutex( - haContext.getCuratorFramework(), zkPath + "/gclock"); - - updateCounter = new DistributedAtomicLong(haContext.getCuratorFramework(), - zkPath + "/counter", haContext.getRetryPolicy()); - try { - updateCounter.initialize(ZK_COUNTER_INIT_VALUE); - } catch (Exception e) { - LOGGER.error("Error initializing counter for zpath " + zkPath, e); - } - - // GC setup - gcCounter = new DistributedAtomicLong(haContext.getCuratorFramework(), - zkPath + "/gccounter", haContext.getRetryPolicy()); - try { - gcCounter.initialize(GC_COUNTER_INIT_VALUE); - } catch (Exception e) { - LOGGER.error("Error initializing counter for zpath " + zkPath, e); - } - final Runnable gcRunner = new Runnable() { - public void run() { - gcPluginCache(conf); - } - }; - gcSchedulerForZk.scheduleAtFixedRate(gcRunner, 10, 10, TimeUnit.MINUTES); - } - - public void handleCacheUpdate(Update update) throws SentryPluginException { - final Timer.Context timerContext = SentryHdfsMetricsUtil.getCacheSyncToZKTimer.time(); - // post message to ZK cache - try { - // Acquire ZK lock for update cache sync. This ensures that the counter - // increment and znode creation is atomic operation - if (!updatorLock.acquire(lockTimeout, TimeUnit.MILLISECONDS)) { - throw new SentryPluginException( - "Failed to get ZK lock for update cache syncup"); - } - } catch (Exception e1) { - // Stop timer in advance - timerContext.stop(); - SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc(); - throw new SentryPluginException( - "Error getting ZK lock for update cache syncup" + e1, e1); - } - boolean failed = false; - try { - try { - // increment the global sequence counter if this is not a full update - if (!update.hasFullImage()) { - update.setSeqNum(updateCounter.increment().postValue()); - } else { - if (updateCounter.get().preValue() < update.getSeqNum()) { - updateCounter.add(update.getSeqNum() - updateCounter.get().preValue()); - } - } - } catch (Exception e1) { - failed = true; - throw new SentryPluginException( - "Error setting ZK counter for update cache syncup" + e1, e1); - } - - // Create a new znode with the sequence number and write the update data - // into it - String updateSeq = String.valueOf(update.getSeqNum()); - String newPath = ZKPaths.makePath(zkPath + "/cache", updateSeq); - try { - haContext.getCuratorFramework().create().creatingParentsIfNeeded() - .forPath(newPath, update.serialize()); - } catch (Exception e) { - failed = true; - throw new SentryPluginException("error posting update to ZK ", e); - } - } finally { - // release the ZK lock - try { - updatorLock.release(); - } catch (Exception e) { - // Stop timer in advance - timerContext.stop(); - SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc(); - throw new SentryPluginException( - "Error releasing ZK lock for update cache syncup" + e, e); - } - timerContext.stop(); - if (failed) { - SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc(); - } - } - } - - public static void setUpdateFromChildEvent(PathChildrenCacheEvent cacheEvent, - Update update) throws IOException { - byte eventData[] = cacheEvent.getData().getData(); - update.deserialize(eventData); - String seqNum = ZKPaths.getNodeFromPath(cacheEvent.getData().getPath()); - update.setSeqNum(Integer.valueOf(seqNum)); - } - - public void close() throws IOException { - cache.close(); - } - - public long getUpdateCounter() throws Exception { - return updateCounter.get().preValue(); - } - - /** - * Cleanup old znode of the plugin cache. The last cleaned and last created - * node counters are stored in ZK. If the number of available nodes are more - * than the high water mark, then we delete the old nodes till we reach low - * water mark. The scheduler periodically runs the cleanup routine - * @param conf - */ - @VisibleForTesting - public void gcPluginCache(Configuration conf) { - try { - // If we can acquire gc lock, then continue with znode cleanup - if (!gcLock.acquire(500, TimeUnit.MILLISECONDS)) { - return; - } - - // If we have passed the High watermark, then start the cleanup - Long updCount = updateCounter.get().preValue(); - Long gcCount = gcCounter.get().preValue(); - if (updCount - gcCount > CACHE_GC_SIZE_THRESHOLD_HWM) { - Long numNodesToClean = Math.min(updCount - gcCount - - CACHE_GC_SIZE_THRESHOLD_LWM, CACHE_GC_SIZE_MAX_CLEANUP); - for (Long nodeNum = gcCount; nodeNum < gcCount + numNodesToClean; nodeNum++) { - String pathToDelete = ZKPaths.makePath(zkPath + "/cache", - Long.toString(nodeNum)); - try { - haContext.getCuratorFramework().delete().forPath(pathToDelete); - gcCounter.increment(); - LOGGER.debug("Deleted znode " + pathToDelete); - } catch (NoNodeException eN) { - // We might have endup with holes in the node counter due to network/ZK errors - // Ignore the delete error if the node doesn't exist and move on - gcCounter.increment(); - } catch (Exception e) { - LOGGER.info("Error cleaning up node " + pathToDelete, e); - break; - } - } - } - } catch (Exception e) { - LOGGER.warn("Error cleaning the cache", e); - } finally { - if (gcLock.isAcquiredInThisProcess()) { - try { - gcLock.release(); - } catch (Exception e) { - LOGGER.warn("Error releasing gc lock", e); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java index 5bf2f6e..e68c708 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java @@ -91,14 +91,6 @@ public class SentryHdfsMetricsUtil { MetricRegistry.name(MetastorePlugin.class, "apply-local-update", "path-change-size")); - // Metrics for handleCacheUpdate to ZK in PluginCacheSyncUtil - // The time used for each handleCacheUpdate - public static final Timer getCacheSyncToZKTimer = sentryMetrics.getTimer( - MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk")); - // The number of failed handleCacheUpdate - public static final Counter getFailedCacheSyncToZK = sentryMetrics.getCounter( - MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk", "failed-num")); - private SentryHdfsMetricsUtil() { // Make constructor private to avoid instantiation } http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java deleted file mode 100644 index 5246e05..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.apache.curator.test.TestingServer; -import org.apache.sentry.hdfs.service.thrift.TRoleChanges; -import org.apache.sentry.provider.db.service.persistent.HAContext; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class TestHAUpdateForwarder extends TestUpdateForwarder { - - private TestingServer server; - - @Before - public void setup() throws Exception { - server = new TestingServer(); - server.start(); - testConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM, - server.getConnectString()); - testConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true); - } - - @Override - @After - public void cleanup() throws Exception { - super.cleanup(); - server.stop(); - HAContext.clearServerContext(); - } - - @Test - public void testThriftSerializer() throws Exception { - List<String> addGroups = Lists.newArrayList("g1", "g2", "g3"); - List<String> delGroups = Lists.newArrayList("d1", "d2", "d3"); - String roleName = "testRole1"; - - TRoleChanges roleUpdate = new TRoleChanges(roleName, addGroups, delGroups); - TRoleChanges newRoleUpdate = (TRoleChanges) ThriftSerializer.deserialize( - roleUpdate, ThriftSerializer.serialize(roleUpdate)); - assertEquals(roleUpdate, newRoleUpdate); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java new file mode 100644 index 0000000..14cdde3 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import java.util.List; + +import com.google.common.base.Joiner; + +import javax.jdo.JDOException; +import javax.jdo.JDOFatalDataStoreException; +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.Query; +import javax.jdo.Transaction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Fences the SQL database.<p/> + * + * Fencing ensures that any SQL requests that were sent by a previously active + * (but now standby) sentry daemon will not be honored. It also ensures that if + * users start up multiple non-HA sentry daemons, only one can become + * active.<p/> + * + * The fencer uses a special SQL table, the SENTRY_FENCE table. When a sentry + * process becomes active, it renames this table so that the name contains the + * current "incarnation ID." The incarnation ID is a randomly generated 128-bit + * ID, which changes each time the process is restarted. From that point + * forward, the sentry process includes a SELECT query for the SENTRY_FENCE + * table in all update transactions. This ensures that if the SENTRY_FENCE + * table is subsequently renamed again, those update transactions will not + * succeed.<p/> + * + * It is important to distinguish between fencing and leader election. + * ZooKeeper is responsible for leader election and ensures that there is only + * ever one active sentry daemon at any one time. However, sentry exists in an + * asynchronous network where requests from a previously active daemon may be + * arbitrarily delayed before reaching the SQL databse. There is also a delay + * between a process being "de-leadered" by ZooKeeper, and the process itself + * becoming aware of this situation. Java's garbage collection pauses tend to + * expose these kinds of race conditions. The SQL database must be prepared to + * reject these stale updates.<p/> + * + * Given that we need this SQL fencing, why bother with ZooKeeper at all? + * ZooKeeper detects when nodes have stopped responding, and elects a new + * leader. The SQL fencing code cannot do that.<p/> + */ +public class Fencer { + private static final Logger LOGGER = LoggerFactory + .getLogger(Fencer.class); + + /** + * The base name of the sentry fencer table.<p/> + * + * We will append the incarnation ID on to this base name to make the final + * table name. + */ + private final static String SENTRY_FENCE_TABLE_BASE = "SENTRY_FENCE"; + + /** + * The update log table name, including the incarnation ID. + */ + private final String tableIncarnationName; + + /** + * The SQL accessor that we're using. + */ + private final SqlAccessor sql; + + /** + * Create an accessor for the update log. + * + * @param incarnationId The ID of the current sentry daemon incarnation. + * @param pmf The PersistenceManagerFactory to use. + */ + public Fencer(String incarnationId, PersistenceManagerFactory pmf) { + this.tableIncarnationName = String. + format("%s_%s", SENTRY_FENCE_TABLE_BASE, incarnationId); + this.sql = SqlAccessor.get(pmf); + } + + /** + * Finds the name of the fencing table.<p/> + * + * The name of the update log table will always begin with SENTRY_UPDATE_LOG, + * but it may have the ID of a previous sentry incarnation tacked on to it. + * + * @return the current name of the update log table, or null if there is none. + * + * @throws JDOFatalDataStoreException If there is more than one sentry + * fencing table. + * JDOException If there was a JDO error. + */ + private String findFencingTable(PersistenceManagerFactory pmf) { + // Perform a SQL query to find the name of the update log table. + PersistenceManager pm = pmf.getPersistenceManager(); + Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE, + sql.getFindTableByPrefixSql(SENTRY_FENCE_TABLE_BASE)); + Transaction tx = pm.currentTransaction(); + try { + tx.begin(); + List<Object> results = (List<Object>) query.execute(); + if (results.isEmpty()) { + return null; + } else if (results.size() != 1) { + throw new JDOFatalDataStoreException( + "Found more than one table whose name begins with " + + "SENTRY_UPDATE_LOG: " + Joiner.on(",").join(results)); + } + String tableName = (String)results.get(0); + if (!tableName.startsWith(SENTRY_FENCE_TABLE_BASE)) { + throw new JDOFatalDataStoreException( + "The result of our attempt to locate the update log table was " + + "a table name which did not begin with " + + SENTRY_FENCE_TABLE_BASE + ", named " + tableName); + } + LOGGER.info("Found sentry update log table named " + tableName); + tx.commit(); + return tableName; + } finally { + if (tx.isActive()) { + tx.rollback(); + } + query.closeAll(); + } + } + + /** + * Creates the fencing table. + * + * @param pmf The PersistenceManagerFactory to use. + * + * @throws JDOException If there was a JDO error. + */ + private void createFenceTable(PersistenceManagerFactory pmf) { + PersistenceManager pm = pmf.getPersistenceManager(); + Transaction tx = pm.currentTransaction(); + Query query = null; + try { + tx.begin(); + query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE, + sql.getCreateTableSql(tableIncarnationName)); + query.execute(); + tx.commit(); + } finally { + if (query != null) { + query.closeAll(); + } + if (tx.isActive()) { + tx.rollback(); + } + pm.close(); + } + } + + /** + * Renames one table to another. + * + * @param pmf The PersistenceManagerFactory to use. + * @param src The table to rename + * @param dst The new name of the table. + * + * @throws JDOException If there was a JDO error. + */ + private void renameTable(PersistenceManagerFactory pmf, String src, + String dst) { + boolean success = false; + PersistenceManager pm = pmf.getPersistenceManager(); + Transaction tx = pm.currentTransaction(); + Query query = null; + try { + tx.begin(); + query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE, + sql.getRenameTableSql(src, dst)); + query.execute(); + tx.commit(); + success = true; + } finally { + if (query != null) { + query.closeAll(); + } + if (!success) { + LOGGER.info("Failed to rename table " + src + " to " + dst); + tx.rollback(); + } + pm.close(); + } + } + + /** + * Renames the update log table so that only this incarnation can modify it. + * + * @param pmf The PersistenceManagerFactory to use. + * + * @throws JDOException If there was a JDO error. + */ + public void fence(PersistenceManagerFactory pmf) { + String curTableName = findFencingTable(pmf); + if (curTableName == null) { + createFenceTable(pmf); + LOGGER.info("Created sentry fence table."); + } else if (curTableName.equals(tableIncarnationName)) { + LOGGER.info("Sentry fence table is already named " + + tableIncarnationName); + } else { + renameTable(pmf, curTableName, tableIncarnationName); + LOGGER.info("Renamed sentry fence table " + curTableName + " to " + + tableIncarnationName); + } + } + + /** + * Attempt to append an UpdateLogEntry to the update log. + */ + void verify(PersistenceManager pm) { + Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE, + sql.getFetchAllRowsSql(tableIncarnationName)); + query.execute(); + } + + String getTableIncarnationName() { + return tableIncarnationName; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java deleted file mode 100644 index cacc29f..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.imps.DefaultACLProvider; -import org.apache.curator.retry.RetryNTimes; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.sentry.service.thrift.JaasConfiguration; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; -import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.client.ZooKeeperSaslClient; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -/** - * Stores the HA related context - */ -public class HAContext { - - private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class); - private static volatile HAContext serverHAContext = null; - private static boolean aclChecked = false; - - public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service"; - public static final String SENTRY_ZK_JAAS_NAME = "SentryClient"; - private final String zookeeperQuorum; - private final int retriesMaxCount; - private final int sleepMsBetweenRetries; - private final String namespace; - - private final boolean zkSecure; - private List<ACL> saslACL; - - private final CuratorFramework curatorFramework; - private final RetryPolicy retryPolicy; - - protected HAContext(Configuration conf) throws Exception { - this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM, - ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT); - this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT, - ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT); - this.sleepMsBetweenRetries = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS, - ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT); - this.namespace = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE, - ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT); - this.zkSecure = conf.getBoolean(ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY, - ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT); - ACLProvider aclProvider; - validateConf(); - if (zkSecure) { - LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); - setJaasConfiguration(conf); - System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, - SENTRY_ZK_JAAS_NAME); - saslACL = Lists.newArrayList(); - saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf, - ServerConfig.PRINCIPAL)))); - saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf, - ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL)))); - aclProvider = new SASLOwnerACLProvider(); - String allowConnect = conf.get(ServerConfig.ALLOW_CONNECT); - - if (!Strings.isNullOrEmpty(allowConnect)) { - for (String principal : Arrays.asList(allowConnect.split("\\s*,\\s*"))) { - LOGGER.info("Adding acls for " + principal); - saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal))); - } - } - } else { - LOGGER.info("Connecting to ZooKeeper without authentication"); - aclProvider = new DefaultACLProvider(); - } - - retryPolicy = new RetryNTimes(retriesMaxCount, sleepMsBetweenRetries); - this.curatorFramework = CuratorFrameworkFactory.builder() - .namespace(this.namespace) - .connectString(this.zookeeperQuorum) - .retryPolicy(retryPolicy) - .aclProvider(aclProvider) - .build(); - startCuratorFramework(); - } - - /** - * Use common HAContext (ie curator framework connection to ZK) - * - * @param conf - * @throws Exception - */ - public static HAContext getHAContext(Configuration conf) throws Exception { - if (serverHAContext == null) { - serverHAContext = new HAContext(conf); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - LOGGER.info("ShutdownHook closing curator framework"); - try { - clearServerContext(); - } catch (Throwable t) { - LOGGER.error("Error stopping SentryService", t); - } - } - }); - - } - return serverHAContext; - } - - // HA context for server which verifies the ZK ACLs on namespace - public static HAContext getHAServerContext(Configuration conf) throws Exception { - HAContext serverContext = getHAContext(conf); - serverContext.checkAndSetACLs(); - return serverContext; - } - - @VisibleForTesting - public static synchronized void clearServerContext() { - if (serverHAContext != null) { - serverHAContext.getCuratorFramework().close(); - serverHAContext = null; - } - } - - public void startCuratorFramework() { - if (curatorFramework.getState() != CuratorFrameworkState.STARTED) { - curatorFramework.start(); - } - } - - public CuratorFramework getCuratorFramework() { - return this.curatorFramework; - } - - public String getZookeeperQuorum() { - return zookeeperQuorum; - } - - public static boolean isHaEnabled(Configuration conf) { - return conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, ServerConfig.SENTRY_HA_ENABLED_DEFAULT); - } - - public String getNamespace() { - return namespace; - } - - public RetryPolicy getRetryPolicy() { - return retryPolicy; - } - - private void validateConf() { - Preconditions.checkNotNull(zookeeperQuorum, "Zookeeper Quorum should not be null."); - Preconditions.checkNotNull(namespace, "Zookeeper namespace should not be null."); - } - - protected String getServicePrincipal(Configuration conf, String confProperty) - throws IOException { - String principal = conf.get(confProperty); - Preconditions.checkNotNull(principal); - Preconditions.checkArgument(principal.length() != 0, "Server principal is not right."); - return principal.split("[/@]")[0]; - } - - private void checkAndSetACLs() throws Exception { - if (zkSecure && !aclChecked) { - // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes - // and set the ACLs for them. This is done just once at the startup - // We can't get the namespace znode through curator; have to go through zk client - startCuratorFramework(); - String newNamespace = "/" + curatorFramework.getNamespace(); - if (curatorFramework.getZookeeperClient().getZooKeeper().exists(newNamespace, null) != null) { - List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(newNamespace, new Stat()); - if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) { - LOGGER.info("'sasl' ACLs not set; setting..."); - List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(newNamespace, null); - for (String child : children) { - checkAndSetACLs("/" + child); - } - curatorFramework.getZookeeperClient().getZooKeeper().setACL(newNamespace, saslACL, -1); - } - } - aclChecked = true; - - } - } - - private void checkAndSetACLs(String path) throws Exception { - LOGGER.info("Setting acls on " + path); - List<String> children = curatorFramework.getChildren().forPath(path); - for (String child : children) { - checkAndSetACLs(path + "/" + child); - } - curatorFramework.setACL().withACL(saslACL).forPath(path); - } - - // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() - private void setJaasConfiguration(Configuration conf) throws IOException { - if ("false".equalsIgnoreCase(conf.get( - ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE, - ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT))) { - String keytabFile = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB); - Preconditions.checkArgument(keytabFile.length() != 0, "Keytab File is not right."); - String principal = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL); - principal = SecurityUtil.getServerPrincipal(principal, - conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT)); - Preconditions.checkArgument(principal.length() != 0, "Kerberos principal is not right."); - - // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to - // point to it (but this way we don't have to write a file, and it works better for the tests) - JaasConfiguration.addEntryForKeytab(SENTRY_ZK_JAAS_NAME, principal, keytabFile); - } else { - // Create jaas conf for ticket cache - JaasConfiguration.addEntryForTicketCache(SENTRY_ZK_JAAS_NAME); - } - javax.security.auth.login.Configuration.setConfiguration(JaasConfiguration.getInstance()); - } - - public class SASLOwnerACLProvider implements ACLProvider { - @Override - public List<ACL> getDefaultAcl() { - return saslACL; - } - - @Override - public List<ACL> getAclForPath(String path) { - return saslACL; - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 7dad496..6e367e5 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -71,6 +71,8 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap; import org.apache.sentry.provider.db.service.thrift.TSentryRole; +import org.apache.sentry.service.thrift.Activator; +import org.apache.sentry.service.thrift.Activators; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.datanucleus.store.rdbms.exceptions.MissingTableException; @@ -102,7 +104,6 @@ public class SentryStore { public static final String NULL_COL = "__NULL__"; public static int INDEX_GROUP_ROLES_MAP = 0; public static int INDEX_USER_ROLES_MAP = 1; - static final String DEFAULT_DATA_DIR = "sentry_policy_db"; private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL, AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER, @@ -116,6 +117,11 @@ public class SentryStore { AccessConstants.ACTION_ALL.toLowerCase(), AccessConstants.SELECT, AccessConstants.INSERT); /** + * The activator object which tells us whether the current daemon is active. + */ + private final Activator act; + + /** * Commit order sequence id. This is used by notification handlers * to know the order in which events where committed to the database. * This instance variable is incremented in incrementGetSequenceId @@ -128,10 +134,8 @@ public class SentryStore { private PrivCleaner privCleaner = null; private Thread privCleanerThread = null; - public SentryStore(Configuration conf) throws SentryNoSuchObjectException, - SentryAccessDeniedException, SentrySiteConfigurationException, IOException { - commitSequenceId = 0; - this.conf = conf; + public static Properties getDataNucleusProperties(Configuration conf) + throws SentrySiteConfigurationException, IOException { Properties prop = new Properties(); prop.putAll(ServerConfig.SENTRY_STORE_DEFAULTS); String jdbcUrl = conf.get(ServerConfig.SENTRY_STORE_JDBC_URL, "").trim(); @@ -164,8 +168,19 @@ public class SentryStore { prop.setProperty(key, entry.getValue()); } } + // Disallow operations outside of transactions + prop.setProperty("datanucleus.NontransactionalRead", "false"); + prop.setProperty("datanucleus.NontransactionalWrite", "false"); + return prop; + } - + public SentryStore(Configuration conf) + throws SentryNoSuchObjectException, SentryAccessDeniedException, + SentrySiteConfigurationException, IOException { + this.act = Activators.INSTANCE.get(conf); + commitSequenceId = 0; + this.conf = conf; + Properties prop = getDataNucleusProperties(conf); boolean checkSchemaVersion = conf.get( ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, ServerConfig.SENTRY_VERIFY_SCHEM_VERSION_DEFAULT).equalsIgnoreCase( @@ -175,11 +190,6 @@ public class SentryStore { prop.setProperty("datanucleus.autoCreateSchema", "true"); prop.setProperty("datanucleus.fixedDatastore", "false"); } - - // Disallow operations outside of transactions - prop.setProperty("datanucleus.NontransactionalRead", "false"); - prop.setProperty("datanucleus.NontransactionalWrite", "false"); - pmf = JDOHelper.getPersistenceManagerFactory(prop); verifySentryStoreSchema(checkSchemaVersion); http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java deleted file mode 100644 index 79dfe48..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.details.InstanceSerializer; - -public class ServiceRegister { - - private HAContext haContext; - - public ServiceRegister(HAContext haContext) { - this.haContext = haContext; - } - - public void regService(String host, int port) throws Exception { - - haContext.startCuratorFramework(); - ServiceInstance<Void> serviceInstance = ServiceInstance.<Void>builder() - .address(host) - .port(port) - .name(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE) - .build(); - - InstanceSerializer<Void> instanceSerializer = new FixedJsonInstanceSerializer<Void>(Void.class); - ServiceDiscoveryBuilder.builder(Void.class) - .basePath(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE) - .client(haContext.getCuratorFramework()) - .serializer(instanceSerializer) - .thisInstance(serviceInstance) - .build() - .start(); - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java new file mode 100644 index 0000000..9879e67 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import java.sql.Connection; + +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.datastore.JDOConnection; + +/** + * An accessor for a SQL database. + * + * SqlAccessor objects generate raw SQL statements in a variety of dialects. + * We use this to do stuff that the DataNucleus architects didn't anticipate, + * like rename tables or search for tables by a prefix name.<p/> + * + * This class exists only to implement fencing. While it's theoretically + * possible to do other things with it, it is almost always better to use the + * functionality provided by DataNucleus if it is at all possible.<p/> + * + * Note: do NOT pass any untrusted user input into these functions. You must + * NOT create SQL statements from unsanitized user input because they may expose + * you to SQL injection attacks. Use prepared statements if you need to do that + * (yes, it's possible via DataNucleus.)<p/> + */ +abstract class SqlAccessor { + /** + * The string which we can use with PersistenceManager#newQuery to perform raw + * SQL operations. + */ + final static String JDO_SQL_ESCAPE = "javax.jdo.query.SQL"; + + /** + * Get an accessor for the SQL database that we're using. + * + * @return The singleton accessor instance for the SQL database we are using. + * + * @throws RuntimeException If there was an error loading the SqlAccessor. + * This could happen because we don't know the + * type of database that we're using. In theory + * it could also happen because JDO is being run + * against something that is not a SQL databse at + * all. + */ + static SqlAccessor get(PersistenceManagerFactory pmf) { + String productName = getProductNameString(pmf).toLowerCase(); + if (productName.contains("postgresql")) { + return PostgresSqlAccessor.INSTANCE; + } else if (productName.contains("mysql")) { + return MySqlSqlAccessor.INSTANCE; + } else if (productName.contains("oracle")) { + return OracleSqlAccessor.INSTANCE; + } else if (productName.contains("derby")) { + return DerbySqlAccessor.INSTANCE; + } else if (productName.contains("db2")) { + return Db2SqlAccessor.INSTANCE; + } else { + throw new RuntimeException("Unknown database type " + + "'" + productName + "'. Supported database types are " + + "postgres, mysql, oracle, mssql, and derby."); + } + } + + /** + * @return An string describing the type of database that we're using. + */ + static private String getProductNameString(PersistenceManagerFactory pmf) { + PersistenceManager pm = pmf.getPersistenceManager(); + JDOConnection jdoConn = pm.getDataStoreConnection(); + try { + return ((Connection)jdoConn.getNativeConnection()).getMetaData(). + getDatabaseProductName(); + } catch (Throwable t) { + throw new RuntimeException("Error retrieving database product " + + "name", t); + } finally { + // We must release the connection before we call other pm methods. + jdoConn.close(); + } + } + + /** + * Get the name of this database. + * + * @return The name of this databse. + */ + abstract String getDatabaseName(); + + /** + * Get the SQL for finding a table that starts with the given prefix. + * + * @param prefix The prefix of the table to find. + * @return The SQL. + */ + abstract String getFindTableByPrefixSql(String prefix); + + /** + * Get the SQL for creating a table with the given name. + * + * @param name The name of the table to create. + * @return The SQL. + */ + abstract String getCreateTableSql(String name); + + /** + * Get the SQL for renaming a table. + * + * @param src The name of the table to rename. + * @param dst The new name to give to the table. + * @return The SQL. + */ + abstract String getRenameTableSql(String src, String dst); + + /** + * Get the SQL for fetching all rows from the given table. + * + * @param name The table name. + * @return The SQL. + */ + abstract String getFetchAllRowsSql(String name); + + /** + * The postgres database type.<p/> + * + * Postgres is case-senstitive, but will translate all identifiers to + * lowercase unless you quote them. So we quote all identifiers when using + * postgres. + */ + private static class PostgresSqlAccessor extends SqlAccessor { + static final PostgresSqlAccessor INSTANCE = new PostgresSqlAccessor(); + + @Override + String getDatabaseName() { + return "postgres"; + } + + @Override + String getFindTableByPrefixSql(String prefix) { + return "SELECT table_name FROM information_schema.tables " + + "WHERE table_name LIKE '" + prefix + "%'"; + } + + @Override + String getCreateTableSql(String name) { + return "CREATE TABLE \"" + name + "\" (\"VAL\" VARCHAR(512))"; + } + + @Override + String getRenameTableSql(String src, String dst) { + return "ALTER TABLE \"" + src + "\" RENAME TO \"" + dst + "\""; + } + + @Override + String getFetchAllRowsSql(String tableName) { + return "SELECT * FROM \"" + tableName + "\""; + } + } + + /** + * The MySQL database type.<p/> + * + * MySQL can't handle quotes unless specifically configured to accept them. + */ + private static class MySqlSqlAccessor extends SqlAccessor { + static final MySqlSqlAccessor INSTANCE = new MySqlSqlAccessor(); + + @Override + String getDatabaseName() { + return "mysql"; + } + + @Override + String getFindTableByPrefixSql(String prefix) { + return "SELECT table_name FROM information_schema.tables " + + "WHERE table_name LIKE " + prefix + "%"; + } + + @Override + String getCreateTableSql(String name) { + return "CREATE TABLE " + name + " (VAL VARCHAR(512))"; + } + + @Override + String getRenameTableSql(String src, String dst) { + return "RENAME TABLE " + src + " TO " + dst; + } + + @Override + String getFetchAllRowsSql(String tableName) { + return "SELECT * FROM " + tableName; + } + } + + /** + * The Oracle database type.<p/> + */ + private static class OracleSqlAccessor extends SqlAccessor { + static final OracleSqlAccessor INSTANCE = new OracleSqlAccessor(); + + @Override + String getDatabaseName() { + return "oracle"; + } + + @Override + String getFindTableByPrefixSql(String prefix) { + return "SELECT table_name FROM all_tables " + + "WHERE table_name LIKE " + prefix + "%"; + } + + @Override + String getCreateTableSql(String name) { + return "CREATE TABLE " + name + " (VAL VARCHAR(512))"; + } + + @Override + String getRenameTableSql(String src, String dst) { + return "RENAME TABLE " + src + " TO " + dst; + } + + @Override + String getFetchAllRowsSql(String tableName) { + return "SELECT * FROM " + tableName; + } + } + + /** + * The Derby database type.</p> + */ + private static class DerbySqlAccessor extends SqlAccessor { + static final DerbySqlAccessor INSTANCE = new DerbySqlAccessor(); + + @Override + String getFindTableByPrefixSql(String prefix) { + return "SELECT tablename FROM SYS.SYSTABLES " + + "WHERE tablename LIKE '" + prefix + "%'"; + } + + @Override + String getCreateTableSql(String name) { + return "CREATE TABLE " + name + " (VAL VARCHAR(512))"; + } + + @Override + String getRenameTableSql(String src, String dst) { + return "RENAME TABLE " + src + " TO " + dst; + } + + @Override + String getDatabaseName() { + return "derby"; + } + + @Override + String getFetchAllRowsSql(String tableName) { + return "SELECT * FROM " + tableName; + } + } + + /** + * The DB2 database type.</p> + */ + private static class Db2SqlAccessor extends SqlAccessor { + static final Db2SqlAccessor INSTANCE = new Db2SqlAccessor(); + + @Override + String getFindTableByPrefixSql(String prefix) { + return "SELECT tablename FROM SYS.SYSTABLES " + + "WHERE tablename LIKE '" + prefix + "%'"; + } + + @Override + String getCreateTableSql(String name) { + return "CREATE TABLE " + name + " (VAL VARCHAR(512))"; + } + + @Override + String getRenameTableSql(String src, String dst) { + return "RENAME TABLE " + src + " TO " + dst; + } + + @Override + String getDatabaseName() { + return "db2"; + } + + @Override + String getFetchAllRowsSql(String tableName) { + return "SELECT * FROM " + tableName; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index 3de1f65..19daa75 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -46,9 +46,7 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntity; import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory; import org.apache.sentry.provider.db.log.util.Constants; import org.apache.sentry.provider.db.service.persistent.CommitContext; -import org.apache.sentry.provider.db.service.persistent.HAContext; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.persistent.ServiceRegister; import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig; import org.apache.sentry.service.thrift.SentryServiceUtil; import org.apache.sentry.service.thrift.ServiceConstants; @@ -84,30 +82,18 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { private final SentryStore sentryStore; private final NotificationHandlerInvoker notificationHandlerInvoker; private final ImmutableSet<String> adminGroups; - private boolean isReady; SentryMetrics sentryMetrics; - private HAContext haContext; private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>(); - public SentryPolicyStoreProcessor(String name, Configuration conf) throws Exception { + public SentryPolicyStoreProcessor(String name, + Configuration conf) throws Exception { super(); this.name = name; this.conf = conf; this.notificationHandlerInvoker = new NotificationHandlerInvoker(conf, createHandlers(conf)); - isReady = false; - if (conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, - ServerConfig.SENTRY_HA_ENABLED_DEFAULT)) { - haContext = HAContext.getHAServerContext(conf); - sentryStore = new SentryStore(conf); - ServiceRegister reg = new ServiceRegister(haContext); - reg.regService(conf.get(ServerConfig.RPC_ADDRESS), - conf.getInt(ServerConfig.RPC_PORT,ServerConfig.RPC_PORT_DEFAULT)); - } else { - sentryStore = new SentryStore(conf); - } - isReady = true; + sentryStore = new SentryStore(conf); adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings( ServerConfig.ADMIN_GROUPS, new String[]{})))); Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER @@ -149,16 +135,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } public void stop() { - if (isReady) { - sentryStore.stop(); - } - if (haContext != null) { - try { - haContext.getCuratorFramework().close(); - } catch (Exception e) { - LOGGER.warn("Error in stopping processor", e); - } - } + sentryStore.stop(); } public void registerPlugin(SentryPolicyStorePlugin plugin) throws SentryPluginException { http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java new file mode 100644 index 0000000..0b7ddf5 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.persistent.Fencer; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.JDOHelper; +import javax.jdo.PersistenceManagerFactory; + +/** + * The activator is used to access and modify the activation state of the sentry daemon.<p/> + */ +public class Activator implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(Activator.class); + + /** + * The DataNucleus PersistenceManagerFactory to use. + */ + private final PersistenceManagerFactory pmf; + + /** + * The handler for LeaderStatus callbacks. + */ + private final TransitionHandler handler; + + /** + * LeaderStatus generates callbacks to let us know when we are active or + * standby. When HA is enabled, it manages ZK sessions. + */ + private final LeaderStatus leaderStatus; + + /** + * The fencer object. + */ + private final Fencer fencer; + + /** + * True if the Activator is active. + */ + private boolean active; + + public Activator(Configuration conf) throws Exception { + Properties props = SentryStore.getDataNucleusProperties(conf); + this.pmf = JDOHelper.getPersistenceManagerFactory(props); + this.handler = new TransitionHandler(); + this.leaderStatus = new LeaderStatus(handler, conf); + this.fencer = new Fencer(this.leaderStatus.getIncarnationId(), pmf); + this.active = false; + this.leaderStatus.start(); + } + + @Override + public void close() throws IOException { + this.leaderStatus.close(); + this.pmf.close(); + } + + private class TransitionHandler implements LeaderStatus.Listener { + @Override + public void becomeActive() throws Exception { + synchronized (Activator.this) { + if (!active) { + LOGGER.info("Activating " + leaderStatus.getIncarnationId()); + fencer.fence(pmf); + active = true; + } + } + } + + @Override + public void becomeStandby() { + synchronized (Activator.this) { + if (active) { + LOGGER.info("Deactivating " + leaderStatus.getIncarnationId()); + active = false; + } + } + } + } + + synchronized boolean isActive() { + return active; + } + + public synchronized String getIncarnationId() { + return leaderStatus.getIncarnationId(); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java new file mode 100644 index 0000000..37b0219 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.utils.SentryConstants; + +/** + * A global map from incarnation IDs to Activator objects.<p/> + * + * This is used to access the current global Activator. Normally there will + * only be one Activator used in a sentry process. There may be multiple + * Activator objects in the case where we are running unit tests. + */ +public enum Activators { + INSTANCE; + + private final HashMap<String, Activator> acts = new HashMap<String, Activator>(); + + Activators() {} + + public synchronized void put(Activator act) { + acts.put(act.getIncarnationId(), act); + } + + public Activator get(Configuration conf) { + String key = conf.get(SentryConstants.CURRENT_INCARNATION_ID_KEY); + if (key == null) { + throw new RuntimeException("No " + + SentryConstants.CURRENT_INCARNATION_ID_KEY + "set."); + } + return get(key); + } + + public synchronized Activator get(String incarnationId) { + Activator act = acts.get(incarnationId); + if (act == null) { + throw new RuntimeException("No activator found with " + + "incarnationId " + incarnationId); + } + return act; + } + + public synchronized void remove(Activator act) { + Activator removed = acts.remove(act.getIncarnationId()); + if (removed == null) { + throw new RuntimeException("No activator found with " + + "incarnationId " + act.getIncarnationId()); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java index e846766..e32e1db 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java @@ -16,10 +16,10 @@ */ package org.apache.sentry.service.thrift; -import org.apache.commons.codec.binary.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.datanucleus.util.Base64; import java.io.Closeable; import java.io.IOException; @@ -79,13 +79,36 @@ final class LeaderStatus implements Closeable { private final AtomicBoolean closed = new AtomicBoolean(false); /** - * Generate a 128-bit random ID. + * Generate a very long random ID. + * + * We want a name that doesn't start with a number, and which + * contains only letters and numbers. This is important because + * the incarnation ID gets used in SQL databases to name a table. */ static String generateIncarnationId() { SecureRandom srand = new SecureRandom(); - byte[] buf = new byte[32]; + byte[] buf = new byte[33]; srand.nextBytes(buf); - return "sentry_" + Hex.encodeHexString(buf); + char[] cbuf = Base64.encode(buf); + StringBuilder bld = new StringBuilder(); + for (int i = 0; i < cbuf.length; i++) { + boolean safe; + if (i == 0) { + // Some databases can't handle identiifers that start with numbers, + // so always start with a letter. Also replace '+' or '/' with + // something safe. + safe = Character.isLetter(cbuf[i]); + } else { + // Replace '+' or '/' with something safe. + safe = Character.isLetterOrDigit(cbuf[i]); + } + if (!safe) { + bld.append((char)('a' + srand.nextInt(26))); + } else { + bld.append(cbuf[i]); + } + } + return bld.toString(); } LeaderStatus(Listener listener, Configuration conf) throws Exception { http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java index 80a6571..33a5e7b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java @@ -81,6 +81,11 @@ final class LeaderStatusAdaptor private long becomeLeaderCount = 0; /** + * True only if this LeaderStatusAdaptor is closed. + */ + private boolean isClosed = false; + + /** * True only if this incarnation is currently active. */ private boolean isActive = false; @@ -112,9 +117,36 @@ final class LeaderStatusAdaptor this.leaderSelector.start(); } + /** + * Shut down the LeaderStatusAdaptor and wait for it to transition to + * standby. + */ @Override public void close() throws IOException { + // If the adaptor is already closed, calling close again is a no-op. + // Setting isClosed also prevents activation after this point. + lock.lock(); + try { + if (isClosed) { + return; + } + isClosed = true; + } finally { + lock.unlock(); + } + + // Shut down our Curator hooks. leaderSelector.close(); + + // Wait for the adaptor to transition to standby state. + lock.lock(); + try { + while (isActive) { + cond.awaitUninterruptibly(); + } + } finally { + lock.unlock(); + } } /** @@ -148,9 +180,14 @@ final class LeaderStatusAdaptor public void takeLeadership(CuratorFramework client) throws Exception { lock.lock(); try { + if (isClosed) { + LOG.info("LeaderStatusAdaptor: can't become active because the " + + "adaptor is closed."); + return; + } isActive = true; becomeLeaderCount++; - LOG.info("SentryLeaderSelectorClient: becoming active. " + + LOG.info("LeaderStatusAdaptor: becoming active. " + "becomeLeaderCount=" + becomeLeaderCount); listener.becomeActive(); while (isActive) { @@ -158,7 +195,7 @@ final class LeaderStatusAdaptor } } finally { isActive = false; - LOG.info("SentryLeaderSelectorClient: becoming standby"); + LOG.info("LeaderStatusAdaptor: becoming standby"); try { listener.becomeStandby(); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 809af06..531ab35 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.sentry.Command; +import org.apache.sentry.core.common.utils.SentryConstants; import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener; import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import org.apache.sentry.provider.db.service.thrift.SentryMetricsServletContextListener; @@ -95,11 +96,10 @@ public class SentryService implements Callable { private SentryWebServer sentryWebServer; private long maxMessageSize; private final boolean isHA; - private volatile boolean isActive = false; + private final Activator act; SentryMetrics sentryMetrics; - private final LeaderStatus leaderStatus; - public SentryService(Configuration conf) { + public SentryService(Configuration conf) throws Exception { this.conf = conf; int port = conf .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT); @@ -153,25 +153,10 @@ public class SentryService implements Callable { + (count++)); } }); - try { - leaderStatus = new LeaderStatus( - new LeaderStatus.Listener() { - @Override - public void becomeActive() throws Exception { - LOGGER.info("Activating " + leaderStatus.getIncarnationId()); - isActive = true; - } - - @Override - public void becomeStandby() { - LOGGER.info("Deactivating " + leaderStatus.getIncarnationId()); - isActive = false; - } - }, conf); - leaderStatus.start(); // TODO: move this into call? - } catch (Exception e) { - throw new RuntimeException(e); - } + this.act = new Activator(conf); + conf.set(SentryConstants.CURRENT_INCARNATION_ID_KEY, + this.act.getIncarnationId()); + Activators.INSTANCE.put(act); webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT); status = Status.NOT_STARTED; } @@ -307,7 +292,7 @@ public class SentryService implements Callable { public synchronized void stop() throws Exception{ MultiException exception = null; LOGGER.info("Attempting to stop..."); - leaderStatus.close(); + act.close(); if (isRunning()) { LOGGER.info("Attempting to stop sentry thrift service..."); try { @@ -462,7 +447,7 @@ public class SentryService implements Callable { return new Gauge<Boolean>() { @Override public Boolean getValue() { - return isActive; + return act.isActive(); } }; } http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 0ab8192..abc3f58 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -141,6 +141,7 @@ public class ServiceConstants { .put("datanucleus.transactionIsolation", "read-committed") .put("datanucleus.cache.level2", "false") .put("datanucleus.cache.level2.type", "none") + .put("datanucleus.query.sql.allowAll", "true") .put("datanucleus.identifierFactory", "datanucleus1") .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true") .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG") @@ -258,4 +259,7 @@ public class ServiceConstants { TABLE, COLUMN } + + public static final String SENTRY_ZK_JAAS_NAME = "Sentry"; + public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key"; } http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java index f14b586..5999580 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java @@ -21,7 +21,11 @@ import java.io.File; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.Activator; +import org.apache.sentry.service.thrift.Activators; +import org.apache.sentry.service.thrift.ServiceConstants; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.junit.After; import org.junit.AfterClass; @@ -34,6 +38,7 @@ public abstract class SentryStoreIntegrationBase { private static File dataDir; private static File policyFilePath; protected static Configuration conf; + protected static Activator act; protected static DelegateSentryStore sentryStore; protected static PolicyFile policyFile; @@ -57,6 +62,9 @@ public abstract class SentryStoreIntegrationBase { policyFilePath = new File(Files.createTempDir(), "local_policy_file.ini"); conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFilePath.getPath()); + act = new Activator(conf); + conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, act.getIncarnationId()); + Activators.INSTANCE.put(act); } @After @@ -66,6 +74,9 @@ public abstract class SentryStoreIntegrationBase { @AfterClass public static void teardown() { + if (act != null) { + IOUtils.cleanup(null, act); + } if (sentryStore != null) { sentryStore.close(); } @@ -75,6 +86,10 @@ public abstract class SentryStoreIntegrationBase { if (policyFilePath != null) { FileUtils.deleteQuietly(policyFilePath); } + if (act != null) { + Activators.INSTANCE.remove(act); + act = null; + } } public static void addGroupsToUser(String user, String... groupNames) { http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java index 799d5ef..7c66db4 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java @@ -37,6 +37,8 @@ import org.apache.sentry.core.model.sqoop.SqoopActionConstant; import org.apache.sentry.core.common.exception.SentryGrantDeniedException; import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder; import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.Activator; +import org.apache.sentry.service.thrift.Activators; import org.apache.sentry.service.thrift.ServiceConstants; import org.junit.Before; import org.junit.Test; @@ -987,8 +989,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase Configuration confCopy = new Configuration(conf); confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent), InvalidActionFactory.class.getName()); - SentryStoreLayer store = new DelegateSentryStore(confCopy); + Activator act = new Activator(confCopy); + confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, + act.getIncarnationId()); + Activators.INSTANCE.put(act); + SentryStoreLayer store = new DelegateSentryStore(confCopy); testGrantPrivilege(store, externalComponent); + act.close(); + Activators.INSTANCE.remove(act); } @Test @@ -997,8 +1005,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase Configuration confCopy = new Configuration(conf); confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent), MyComponentActionFactory.class.getName()); + Activator act = new Activator(confCopy); + confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, + act.getIncarnationId()); + Activators.INSTANCE.put(act); SentryStoreLayer store = new DelegateSentryStore(confCopy); testGrantPrivilege(store, externalComponent); + act.close(); + Activators.INSTANCE.remove(act); } @Test @@ -1007,8 +1021,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase Configuration confCopy = new Configuration(conf); confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, "mycomponent"), MyComponentActionFactory.class.getName()); + Activator act = new Activator(confCopy); + confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, + act.getIncarnationId()); + Activators.INSTANCE.put(act); SentryStoreLayer store = new DelegateSentryStore(confCopy); testGrantPrivilege(store, externalComponent); + act.close(); + Activators.INSTANCE.remove(act); } private void testGrantPrivilege(SentryStoreLayer sentryStore, String component) throws SentryUserException { http://git-wip-us.apache.org/repos/asf/sentry/blob/ff7823b6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 3ef1cf7..6e00505 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.hadoop.security.alias.UserProvider; @@ -44,6 +45,9 @@ import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; import org.apache.sentry.provider.db.service.thrift.TSentryRole; import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.Activator; +import org.apache.sentry.service.thrift.Activators; +import org.apache.sentry.service.thrift.ServiceConstants; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.junit.After; import org.junit.AfterClass; @@ -67,6 +71,7 @@ public class TestSentryStore extends org.junit.Assert { final long NUM_PRIVS = 60; // > SentryStore.PrivCleaner.NOTIFY_THRESHOLD private static Configuration conf = null; private static char[] passwd = new char[] { '1', '2', '3'}; + private static Activator act; @BeforeClass public static void setup() throws Exception { @@ -89,6 +94,10 @@ public class TestSentryStore extends org.junit.Assert { policyFilePath = new File(dataDir, "local_policy_file.ini"); conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFilePath.getPath()); + act = new Activator(conf); + conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, + act.getIncarnationId()); + Activators.INSTANCE.put(act); sentryStore = new SentryStore(conf); } @@ -107,12 +116,17 @@ public class TestSentryStore extends org.junit.Assert { @AfterClass public static void teardown() { + IOUtils.cleanup(null, act); if (sentryStore != null) { sentryStore.stop(); } if (dataDir != null) { FileUtils.deleteQuietly(dataDir); } + if (act != null) { + Activators.INSTANCE.remove(act); + act = null; + } } @Test
