Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign f783277f8 -> c0333a9cf
SENTRY-1637: Periodically purge Delta change tables. (Lei Xu, Reviewed by: Alex Kolbasov) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c0333a9c Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c0333a9c Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c0333a9c Branch: refs/heads/sentry-ha-redesign Commit: c0333a9cf119c8792130ff8bbf67d7cf041a2a66 Parents: f783277 Author: Alexander Kolbasov <[email protected]> Authored: Tue Feb 28 21:43:03 2017 -0800 Committer: Alexander Kolbasov <[email protected]> Committed: Tue Feb 28 21:43:03 2017 -0800 ---------------------------------------------------------------------- .../hdfs/SentryHDFSServiceProcessorFactory.java | 4 +- .../SentryGenericPolicyProcessorFactory.java | 7 +++- .../thrift/SentryPolicyStoreProcessor.java | 8 ++-- .../SentryPolicyStoreProcessorFactory.java | 9 ++-- .../sentry/service/thrift/HMSFollower.java | 4 +- .../sentry/service/thrift/ProcessorFactory.java | 11 ++++- .../sentry/service/thrift/SentryService.java | 44 +++++++++++++++++++- .../sentry/service/thrift/ServiceConstants.java | 3 ++ 8 files changed, 75 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java index db55b5a..4dc99a2 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java @@ -21,6 +21,7 @@ package org.apache.sentry.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface; +import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.ThriftUtil; import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.thrift.TException; @@ -52,7 +53,8 @@ public class SentryHDFSServiceProcessorFactory extends ProcessorFactory{ } @Override - public boolean register(TMultiplexedProcessor multiplexedProcessor) throws Exception { + public boolean register(TMultiplexedProcessor multiplexedProcessor, + SentryStore _) throws Exception { SentryHDFSServiceProcessor sentryServiceHandler = new SentryHDFSServiceProcessor(); LOGGER.info("Calling registerProcessor from SentryHDFSServiceProcessorFactory"); http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorFactory.java index 1cce1fc..9fb1de6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorFactory.java @@ -18,6 +18,7 @@ package org.apache.sentry.provider.db.generic.service.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.TProcessor; @@ -29,11 +30,13 @@ public class SentryGenericPolicyProcessorFactory extends ProcessorFactory { } @Override - public boolean register(TMultiplexedProcessor multiplexedProcessor) throws Exception { + public boolean register(TMultiplexedProcessor multiplexedProcessor, + SentryStore _) throws Exception { SentryGenericPolicyProcessor processHandler = new SentryGenericPolicyProcessor(conf); TProcessor processor = new SentryGenericPolicyProcessorWrapper<SentryGenericPolicyService.Iface>( processHandler); - multiplexedProcessor.registerProcessor(SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME, processor); + multiplexedProcessor.registerProcessor( + SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME, processor); return true; } http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index 48e2587..30e91ae 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -75,7 +75,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(SentryPolicyStoreProcessor.class); private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger(Constants.AUDIT_LOGGER_NAME); - public static final String SENTRY_POLICY_SERVICE_NAME = "SentryPolicyService"; + static final String SENTRY_POLICY_SERVICE_NAME = "SentryPolicyService"; private final String name; private final Configuration conf; @@ -86,14 +86,14 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>(); - public SentryPolicyStoreProcessor(String name, - Configuration conf) throws Exception { + SentryPolicyStoreProcessor(String name, + Configuration conf, SentryStore store) throws Exception { super(); this.name = name; this.conf = conf; + this.sentryStore = store; this.notificationHandlerInvoker = new NotificationHandlerInvoker(conf, createHandlers(conf)); - sentryStore = new SentryStore(conf); adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings( ServerConfig.ADMIN_GROUPS, new String[]{})))); Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessorFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessorFactory.java index 691c1fb..977152d 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessorFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessorFactory.java @@ -18,6 +18,7 @@ package org.apache.sentry.provider.db.service.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.TProcessor; @@ -27,13 +28,15 @@ public class SentryPolicyStoreProcessorFactory extends ProcessorFactory { super(conf); } - public boolean register(TMultiplexedProcessor multiplexedProcessor) throws Exception { + public boolean register(TMultiplexedProcessor multiplexedProcessor, + SentryStore sentryStore) throws Exception { SentryPolicyStoreProcessor sentryServiceHandler = new SentryPolicyStoreProcessor(SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME, - conf); + conf, sentryStore); TProcessor processor = new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler); - multiplexedProcessor.registerProcessor(SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME, processor); + multiplexedProcessor.registerProcessor( + SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME, processor); return true; } } http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index c91051d..bdbb0cc 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -81,12 +81,12 @@ public class HMSFollower implements Runnable { private boolean needHiveSnapshot = true; private final LeaderStatusMonitor leaderMonitor; - HMSFollower(Configuration conf, LeaderStatusMonitor leaderMonitor) + HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) throws Exception { LOGGER.info("HMSFollower is being initialized"); authzConf = conf; this.leaderMonitor = leaderMonitor; - sentryStore = new SentryStore(authzConf); + sentryStore = store; //TODO: Initialize currentEventID from Sentry db currentEventID = 0; } http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ProcessorFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ProcessorFactory.java index a3bb6ab..2a48c63 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ProcessorFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ProcessorFactory.java @@ -18,6 +18,7 @@ package org.apache.sentry.service.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.thrift.TMultiplexedProcessor; public abstract class ProcessorFactory { @@ -27,5 +28,13 @@ public abstract class ProcessorFactory { this.conf = conf; } - public abstract boolean register(TMultiplexedProcessor processor) throws Exception; + /** + * Register a Thrift processor with SentryStore. + * @param processor a thrift processor. + * @param sentryStore a {@link SentryStore} + * @return true if success. + * @throws Exception + */ + public abstract boolean register(TMultiplexedProcessor processor, + SentryStore sentryStore) throws Exception; } http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index e6021f1..132db63 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -46,6 +46,7 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.sentry.Command; import org.apache.sentry.core.common.utils.SigUtils; +import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener; import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import org.apache.sentry.provider.db.service.thrift.SentryMetricsServletContextListener; @@ -94,6 +95,14 @@ public class SentryService implements Callable, SigUtils.SigListener { private final int webServerPort; private SentryWebServer sentryWebServer; private final long maxMessageSize; + /* + sentryStore provides the data access for sentry data. It is the singleton instance shared + between various {@link SentryPolicyService}, i.e., {@link SentryPolicyStoreProcessor} and + {@link HMSFollower}. + */ + private final SentryStore sentryStore; + private final ScheduledExecutorService sentryStoreCleanService = + Executors.newSingleThreadScheduledExecutor(); private final LeaderStatusMonitor leaderMonitor; private final boolean notificationLogEnabled; @@ -149,6 +158,7 @@ public class SentryService implements Callable, SigUtils.SigListener { + (count++)); } }); + this.sentryStore = new SentryStore(conf); this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf); webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT); @@ -162,7 +172,7 @@ public class SentryService implements Callable, SigUtils.SigListener { long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS_DEFAULT); hmsFollowerExecutor = Executors.newScheduledThreadPool(1); - hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, leaderMonitor), + hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, sentryStore, leaderMonitor), initDelay, period, TimeUnit.MILLISECONDS); } catch (Exception e) { //TODO: Handle @@ -182,6 +192,25 @@ public class SentryService implements Callable, SigUtils.SigListener { LOGGER.error("Failed to register signal", e); } } + + // If SENTRY_STORE_CLEAN_PERIOD_SECONDS is set to positive, the background SentryStore cleaning + // thread is enabled. Currently, it only purges the delta changes {@link MSentryChange} in + // the sentry store. + long storeCleanPeriodSecs = conf.getLong( + ServerConfig.SENTRY_STORE_CLEAN_PERIOD_SECONDS, + ServerConfig.SENTRY_STORE_CLEAN_PERIOD_SECONDS_DEFAULT); + if (storeCleanPeriodSecs > 0) { + Runnable storeCleaner = new Runnable() { + @Override + public void run() { + if (leaderMonitor.isLeader()) { + sentryStore.purgeDeltaChangeTables(); + } + } + }; + sentryStoreCleanService.scheduleWithFixedDelay( + storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS); + } } @Override @@ -232,7 +261,7 @@ public class SentryService implements Callable, SigUtils.SigListener { LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName()); ProcessorFactory factory = (ProcessorFactory) constructor .newInstance(conf); - boolean registerStatus = factory.register(processor); + boolean registerStatus = factory.register(processor, sentryStore); if (!registerStatus) { LOGGER.error("Failed to register " + clazz.getCanonicalName()); } @@ -343,6 +372,17 @@ public class SentryService implements Callable, SigUtils.SigListener { if(hmsFollowerExecutor != null) { hmsFollowerExecutor.shutdown(); } + sentryStoreCleanService.shutdown(); + try { + if (!sentryStoreCleanService.awaitTermination(10, TimeUnit.SECONDS)) { + sentryStoreCleanService.shutdownNow(); + if (!sentryStoreCleanService.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("DeltaCleanerService did not terminate"); + } + } + } catch (InterruptedException ie) { + sentryStoreCleanService.shutdownNow(); + } if (exception != null) { exception.ifExceptionThrow(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/c0333a9c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 806d03e..d3a68c9 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -121,6 +121,9 @@ public class ServiceConstants { public static final String SENTRY_STORE_ORPHANED_PRIVILEGE_REMOVAL = "sentry.store.orphaned.privilege.removal"; public static final String SENTRY_STORE_ORPHANED_PRIVILEGE_REMOVAL_DEFAULT = "false"; + public static final String SENTRY_STORE_CLEAN_PERIOD_SECONDS = + "sentry.store.clean.period.seconds"; + public static final long SENTRY_STORE_CLEAN_PERIOD_SECONDS_DEFAULT = 43200; // 12 hours. public static final String SENTRY_HA_ENABLED = "sentry.ha.enabled"; public static final boolean SENTRY_HA_ENABLED_DEFAULT = false; public static final String SENTRY_HA_ZK_PROPERTY_PREFIX = "sentry.ha.zookeeper.";
