Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 2911c532b -> 7770b6ea0
SENTRY-1621: HMSFollower to retry connecting to HMS upon connection loss (Vamsee Yarlagadda, Reviewed by: Alexander Kolbasov, Hao Hao) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/7770b6ea Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/7770b6ea Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/7770b6ea Branch: refs/heads/sentry-ha-redesign Commit: 7770b6ea0cc227a778894c51d43e92d50e7f663f Parents: 2911c53 Author: Vamsee Yarlagadda <[email protected]> Authored: Thu Feb 2 20:29:51 2017 -0800 Committer: Vamsee Yarlagadda <[email protected]> Committed: Fri Feb 3 16:44:06 2017 -0800 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 3 +- .../sentry/service/thrift/HMSFollower.java | 44 +++++++++++++------- 2 files changed, 32 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/7770b6ea/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index a1f970b..f95dd94 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,7 +297,7 @@ public class FullUpdateInitializer implements Closeable { ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); } - public UpdateableAuthzPaths createInitialUpdate() throws Exception { + public UpdateableAuthzPaths createInitialUpdate() throws ExecutionException, InterruptedException, TException { UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[]{"/"}); PathsUpdate tempUpdate = new PathsUpdate(-1, false); http://git-wip-us.apache.org/repos/asf/sentry/blob/7770b6ea/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index ecd1175..7f7a7d3 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -48,9 +48,11 @@ import javax.security.auth.Subject; import javax.security.auth.login.LoginException; import java.io.File; import java.io.IOException; +import java.net.SocketException; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; @@ -175,6 +177,7 @@ public class HMSFollower implements Runnable { // Shutdown kerberos context if HMS connection failed to setup to avoid thread leaks. if (kerberosContext != null && client == null) { kerberosContext.shutDown(); + kerberosContext = null; } } } else { @@ -223,23 +226,19 @@ public class HMSFollower implements Runnable { // will be dropped. A new attempts will be made after 500 milliseconds when // HMSFollower run again. - CurrentNotificationEventId eventIDBefore = null; - CurrentNotificationEventId eventIDAfter = null; + CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId(); + LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore)); try { - eventIDBefore = client.getCurrentNotificationEventId(); - LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", - eventIDBefore)); fetchFullUpdate(); - eventIDAfter = client.getCurrentNotificationEventId(); - LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", - eventIDAfter)); - } catch (Exception ex) { - LOGGER.error("#### Encountered failure during fetching one hive full snapshot !! Current NotificationID = " + - eventIDAfter.toString(), ex); + } catch (ExecutionException | InterruptedException ex) { + LOGGER.error("#### Encountered failure during fetching one hive full snapshot !!", ex); return; } + CurrentNotificationEventId eventIDAfter = client.getCurrentNotificationEventId(); + LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", eventIDAfter)); + if (!eventIDBefore.equals(eventIDAfter)) { LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " + eventIDAfter.toString()); @@ -259,8 +258,25 @@ public class HMSFollower implements Runnable { processNotificationEvents(response.getEvents()); } } catch (TException e) { - LOGGER.error("ThriftException occured fetching Notification entries, will try"); - e.printStackTrace(); + // If the underlying exception is around socket exception, it is better to retry connection to HMS + if (e.getCause() instanceof SocketException) { + LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e); + try { + if (client != null) { + client.close(); + client = null; + } + if (kerberosContext != null) { + kerberosContext.shutDown(); + kerberosContext = null; + } + } catch (LoginException le) { + LOGGER.warn("Failed to stop kerberos context (potential to cause thread leak)", le); + throw new RuntimeException(le); + } + } else { + LOGGER.error("ThriftException occured fetching Notification entries, will try", e); + } } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) { throw new RuntimeException(e); } @@ -269,7 +285,7 @@ public class HMSFollower implements Runnable { /** * Retrieve HMS full snapshot. */ - private void fetchFullUpdate() throws Exception { + private void fetchFullUpdate() throws ExecutionException, InterruptedException, TException { FullUpdateInitializer updateInitializer = null; try {
