SENTRY-1798: Provide names for HMSFollower and cleaner threads (Alex Kolbasov, reviewed by Vamsee Yarlagadda)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/0c725668 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/0c725668 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/0c725668 Branch: refs/heads/master Commit: 0c7256686ff2d93da37821278547b00c14feb787 Parents: d4b64fa Author: Alexander Kolbasov <[email protected]> Authored: Wed Jul 12 21:14:16 2017 +0200 Committer: Alexander Kolbasov <[email protected]> Committed: Wed Jul 12 21:14:46 2017 +0200 ---------------------------------------------------------------------- .../sentry/hdfs/SentryAuthorizationInfo.java | 43 ++++++++--------- .../db/service/persistent/HAContext.java | 20 +++++--- .../service/thrift/FullUpdateInitializer.java | 12 ++++- .../service/thrift/SentryKerberosContext.java | 15 +++--- .../sentry/service/thrift/SentryService.java | 49 ++++++++++++-------- 5 files changed, 86 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/0c725668/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java index 680db7a..2724a55 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.sentry.hdfs; import java.util.*; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; @@ -40,6 +42,8 @@ public class SentryAuthorizationInfo implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(SentryAuthorizationInfo.class); + private static final String SENTRY_AUTHORIZATION_INFO_THREAD_NAME = "sentry-auth-info-refresher"; + private SentryUpdater updater; private volatile UpdateableAuthzPaths authzPaths; private volatile UpdateableAuthzPermissions authzPermissions; @@ -83,12 +87,15 @@ public class SentryAuthorizationInfo implements Runnable { SentryAuthorizationConstants.CACHE_REFRESH_RETRY_WAIT_KEY, SentryAuthorizationConstants.CACHE_REFRESH_RETRY_WAIT_DEFAULT); - LOG.debug("Sentry authorization will enforced in the following HDFS " + - "locations: [{}]", StringUtils.arrayToString(newPathPrefixes)); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Sentry authorization will enforced in the following HDFS locations: [{}]", + StringUtils.arrayToString(newPathPrefixes)); + } setPrefixPaths(newPathPrefixes); - LOG.debug("Refresh interval [{}]ms, retry wait [{}], stale threshold " + - "[{}]ms", new Object[] - {refreshIntervalMillisec, retryWaitMillisec, staleThresholdMillisec}); + LOG.debug("Refresh interval [{}]ms, retry wait [{}]", + refreshIntervalMillisec, retryWaitMillisec); + LOG.debug("stale threshold [{}]ms", staleThresholdMillisec); authzPaths = new UpdateableAuthzPaths(newPathPrefixes); authzPermissions = new UpdateableAuthzPermissions(); @@ -222,17 +229,11 @@ public class SentryAuthorizationInfo implements Runnable { if (!success) { waitUntil = System.currentTimeMillis() + retryWaitMillisec; } - executor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, SentryAuthorizationInfo.class.getName() + - "-refresher"); - t.setDaemon(true); - return t; - } - } - ); + ThreadFactory sentryAuthInfoRefresherThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(SENTRY_AUTHORIZATION_INFO_THREAD_NAME) + .setDaemon(true) + .build(); + executor = Executors.newSingleThreadScheduledExecutor(sentryAuthInfoRefresherThreadFactory); executor.scheduleWithFixedDelay(this, refreshIntervalMillisec, refreshIntervalMillisec, TimeUnit.MILLISECONDS); } @@ -268,7 +269,7 @@ public class SentryAuthorizationInfo implements Runnable { public boolean isManaged(String[] pathElements) { return isUnderPrefix(pathElements); } - + public boolean doesBelongToAuthzObject(String[] pathElements) { lock.readLock().lock(); try { @@ -296,12 +297,12 @@ public class SentryAuthorizationInfo implements Runnable { // Apparently setFAcl throws error if 'group::---' is not present AclEntry noGroup = AclEntry.parseAclEntry("group::---", true); - Set<AclEntry> retSet = new HashSet<AclEntry>(); + Set<AclEntry> retSet = new HashSet<>(); retSet.add(noGroup); if (authzObjs == null) { - retSet.addAll(Collections.EMPTY_LIST); - return new ArrayList<AclEntry>(retSet); + retSet.addAll(Collections.<AclEntry>emptyList()); + return new ArrayList<>(retSet); } // No duplicate acls should be added. @@ -309,7 +310,7 @@ public class SentryAuthorizationInfo implements Runnable { retSet.addAll(authzPermissions.getAcls(authzObj)); } - return new ArrayList<AclEntry>(retSet); + return new ArrayList<>(retSet); } finally { lock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/0c725668/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java index 0e5d606..71865ca 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java @@ -20,6 +20,7 @@ package org.apache.sentry.provider.db.service.persistent; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.ThreadFactory; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -59,6 +61,7 @@ public final class HAContext implements AutoCloseable { private static boolean aclUnChecked = true; private static final String SENTRY_ZK_JAAS_NAME = "SentryClient"; + private static final String SHUTDOWN_THREAD_NAME = "ha-context-shutdown"; private final String zookeeperQuorum; private final String namespace; @@ -97,7 +100,7 @@ public final class HAContext implements AutoCloseable { if (!Strings.isNullOrEmpty(allowConnect)) { for (String principal : allowConnect.split("\\s*,\\s*")) { - LOGGER.info("Adding acls for " + principal); + LOGGER.info("Adding acls for {}", principal); saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal))); } } @@ -151,7 +154,13 @@ public final class HAContext implements AutoCloseable { serverHAContext = new HAContext(conf); serverHAContext.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { + ThreadFactory haContextShutdownThreadFactory = new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat(SHUTDOWN_THREAD_NAME) + .build(); + Runtime.getRuntime() + .addShutdownHook(haContextShutdownThreadFactory + .newThread(new Runnable() { @Override public void run() { LOGGER.info("ShutdownHook closing curator framework"); @@ -163,7 +172,7 @@ public final class HAContext implements AutoCloseable { LOGGER.error("Error stopping curator framework", t); } } - }); + })); return serverHAContext; } @@ -202,8 +211,7 @@ public final class HAContext implements AutoCloseable { checkNotNull(namespace, "Zookeeper namespace should not be null."); } - private static String getServicePrincipal(Configuration conf, String confProperty) - throws IOException { + private static String getServicePrincipal(Configuration conf, String confProperty) { String principal = checkNotNull(conf.get(confProperty)); checkArgument(!principal.isEmpty(), "Server principal is empty."); return principal.split("[/@]")[0]; @@ -232,7 +240,7 @@ public final class HAContext implements AutoCloseable { } private void checkAndSetACLs(String path) throws Exception { - LOGGER.info("Setting acls on " + path); + LOGGER.info("Setting acls on {}", path); List<String> children = curatorFramework.getChildren().forPath(path); for (String child : children) { this.checkAndSetACLs(path + "/" + child); http://git-wip-us.apache.org/repos/asf/sentry/blob/0c725668/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java index b74fa50..5af1e4f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java @@ -19,6 +19,7 @@ package org.apache.sentry.service.thrift; import com.codahale.metrics.Counter; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; @@ -46,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; @@ -94,6 +96,8 @@ public final class FullUpdateInitializer implements AutoCloseable { * */ + + private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d"; private final ExecutorService threadPool; private final int maxPartitionsPerCall; private final int maxTablesPerCall; @@ -426,9 +430,15 @@ public final class FullUpdateInitializer implements AutoCloseable { waitDurationMillis = conf.getInt( ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); + + ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME) + .setDaemon(false) + .build(); threadPool = Executors.newFixedThreadPool(conf.getInt( ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT), + fullUpdateInitThreadFactory); } /** http://git-wip-us.apache.org/repos/asf/sentry/blob/0c725668/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java index edb8006..efb8ae6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java @@ -21,6 +21,7 @@ package org.apache.sentry.service.thrift; import java.io.File; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ThreadFactory; import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; @@ -28,21 +29,22 @@ import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; public class SentryKerberosContext implements Runnable { + + private static final String KERBEROS_RENEWER_THREAD_NAME = "kerberos-renewer-%d"; private static final float TICKET_RENEW_WINDOW = 0.80f; private static final Logger LOGGER = LoggerFactory .getLogger(SentryKerberosContext.class); private LoginContext loginContext; private Subject subject; private final javax.security.auth.login.Configuration kerberosConfig; - private Thread renewerThread; - private boolean shutDownRenewer = false; public SentryKerberosContext(String principal, String keyTab, boolean server) @@ -100,12 +102,10 @@ public class SentryKerberosContext implements Runnable { return null; } - @Deprecated private long getRefreshTime(KerberosTicket tgt) { long start = tgt.getStartTime().getTime(); long end = tgt.getEndTime().getTime(); - LOGGER.debug("Ticket start time: " + start); - LOGGER.debug("Ticket End time: " + end); + LOGGER.debug("Ticket start time: {}, end time: {}", start, end); return start + (long) ((end - start) * TICKET_RENEW_WINDOW); } @@ -145,7 +145,10 @@ public class SentryKerberosContext implements Runnable { } public void startRenewerThread() { - renewerThread = new Thread(this); + ThreadFactory renewerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(KERBEROS_RENEWER_THREAD_NAME) + .build(); + renewerThread = renewerThreadFactory.newThread(this); renewerThread.start(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/0c725668/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 8b1d8fc..0c7173f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -34,6 +34,7 @@ import javax.security.auth.Subject; import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -77,6 +78,11 @@ public class SentryService implements Callable, SigUtils.SigListener { private static final Logger LOGGER = LoggerFactory.getLogger(SentryService.class); private HiveSimpleConnectionFactory hiveConnectionFactory; + private static final String SENTRY_SERVICE_THREAD_NAME = "sentry-service"; + private static final String HMSFOLLOWER_THREAD_NAME = "hms-follower"; + private static final String STORE_CLEANER_THREAD_NAME = "store-cleaner"; + private static final String SERVICE_SHUTDOWN_THREAD_NAME = "service-shutdown"; + private enum Status { NOT_STARTED, STARTED, @@ -121,7 +127,7 @@ public class SentryService implements Callable, SigUtils.SigListener { this.address = NetUtils.createSocketAddr( conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT), port); - LOGGER.info("Configured on address " + address); + LOGGER.info("Configured on address {}", address); kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS, @@ -138,7 +144,7 @@ public class SentryService implements Callable, SigUtils.SigListener { } catch(IOException io) { throw new RuntimeException("Can't translate kerberos principal'", io); } - LOGGER.info("Using kerberos principal: " + principal); + LOGGER.info("Using kerberos principal: {}", principal); principalParts = SaslRpcServer.splitKerberosName(principal); Preconditions.checkArgument(principalParts.length == 3, @@ -147,21 +153,16 @@ public class SentryService implements Callable, SigUtils.SigListener { ServerConfig.KEY_TAB + " is required"); File keytabFile = new File(keytab); Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), - "Keytab " + keytab + " does not exist or is not readable."); + "Keytab %s does not exist or is not readable.", keytab); } else { principal = null; principalParts = null; keytab = null; } - serviceExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { - private int count = 0; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, SentryService.class.getSimpleName() + "-" - + (count++)); - } - }); + ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(SENTRY_SERVICE_THREAD_NAME) + .build(); + serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory); this.sentryStore = new SentryStore(conf); this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf); webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT); @@ -176,7 +177,7 @@ public class SentryService implements Callable, SigUtils.SigListener { // Enable signal handler for HA leader/follower status if configured String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG); if ((sigName != null) && !sigName.isEmpty()) { - LOGGER.info("Registering signal handler " + sigName + " for HA"); + LOGGER.info("Registering signal handler {} for HA", sigName); try { registerSigListener(sigName, this); } catch (Exception e) { @@ -269,7 +270,7 @@ public class SentryService implements Callable, SigUtils.SigListener { .protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .minWorkerThreads(minThreads).maxWorkerThreads(maxThreads); thriftServer = new TThreadPoolServer(args); - LOGGER.info("Serving on " + address); + LOGGER.info("Serving on {}", address); startSentryWebServer(); // thriftServer.serve() does not return until thriftServer is stopped. Need to log before @@ -309,7 +310,10 @@ public class SentryService implements Callable, SigUtils.SigListener { long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS, ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS_DEFAULT); try { - hmsFollowerExecutor = Executors.newScheduledThreadPool(1); + ThreadFactory hmsFollowerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(HMSFOLLOWER_THREAD_NAME) + .build(); + hmsFollowerExecutor = Executors.newScheduledThreadPool(1, hmsFollowerThreadFactory); hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower, initDelay, period, TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { @@ -386,7 +390,10 @@ public class SentryService implements Callable, SigUtils.SigListener { } }; - sentryStoreCleanService = Executors.newSingleThreadScheduledExecutor(); + ThreadFactory sentryStoreCleanerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(STORE_CLEANER_THREAD_NAME) + .build(); + sentryStoreCleanService = Executors.newSingleThreadScheduledExecutor(sentryStoreCleanerThreadFactory); sentryStoreCleanService.scheduleWithFixedDelay( storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS); @@ -418,7 +425,7 @@ public class SentryService implements Callable, SigUtils.SigListener { Boolean sentryReportingEnable = conf.getBoolean(ServerConfig.SENTRY_WEB_ENABLE, ServerConfig.SENTRY_WEB_ENABLE_DEFAULT); if(sentryReportingEnable) { - List<EventListener> listenerList = new ArrayList<EventListener>(); + List<EventListener> listenerList = new ArrayList<>(); listenerList.add(new SentryHealthCheckServletContextListener()); listenerList.add(new SentryMetricsServletContextListener()); sentryWebServer = new SentryWebServer(listenerList, webServerPort, conf); @@ -569,7 +576,11 @@ public class SentryService implements Callable, SigUtils.SigListener { Configuration serverConf = loadConfig(configFileName); final SentryService server = new SentryService(serverConf); server.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { + + ThreadFactory serviceShutdownThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(SERVICE_SHUTDOWN_THREAD_NAME) + .build(); + Runtime.getRuntime().addShutdownHook(serviceShutdownThreadFactory.newThread(new Runnable() { @Override public void run() { LOGGER.info("ShutdownHook shutting down server"); @@ -580,7 +591,7 @@ public class SentryService implements Callable, SigUtils.SigListener { System.exit(1); } } - }); + })); // Let's wait on the service to stop try {
