Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 4a1365de9 -> 42b092a56
SENTRY-1780: FullUpdateInitializer does not kill the threads whenever getFullHMSSnapshot throws an exception (Alex Kolbasov, reviewed by Na Li and Vamsee Yarlagadda) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/42b092a5 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/42b092a5 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/42b092a5 Branch: refs/heads/sentry-ha-redesign Commit: 42b092a56f51c301191f0296fb5045bc16dd6b3d Parents: 4a1365d Author: Alexander Kolbasov <[email protected]> Authored: Mon Jun 5 23:30:17 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Mon Jun 5 23:30:17 2017 -0700 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 80 ++++++++++---------- .../sentry/service/thrift/HMSFollower.java | 17 ++--- 2 files changed, 49 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/42b092a5/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index efd3fa3..cf9774c 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -18,7 +18,6 @@ package org.apache.sentry.hdfs; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -147,11 +146,11 @@ public final class FullUpdateInitializer implements AutoCloseable { } private static final class CallResult { - private final TException failure; + private final Exception failure; private final boolean successStatus; private final ObjectMapping objectMapping; - CallResult(TException ex) { + CallResult(Exception ex) { failure = ex; successStatus = false; objectMapping = emptyObjectMapping; @@ -171,7 +170,7 @@ public final class FullUpdateInitializer implements AutoCloseable { return objectMapping; } - public TException getFailure() { + public Exception getFailure() { return failure; } } @@ -184,53 +183,54 @@ public final class FullUpdateInitializer implements AutoCloseable { private final class RetryStrategy { private int retryStrategyMaxRetries = 0; private final int retryStrategyWaitDurationMillis; - private int retries; private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { this.retryStrategyMaxRetries = retryStrategyMaxRetries; - retries = 0; // Assign default wait duration if negative value is provided. - if (retryStrategyWaitDurationMillis > 0) { - this.retryStrategyWaitDurationMillis = retryStrategyWaitDurationMillis; - } else { - this.retryStrategyWaitDurationMillis = 1000; - } + this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ? + retryStrategyWaitDurationMillis : 1000; } + @SuppressWarnings({"squid:S1141", "squid:S2142"}) public CallResult exec() { - // Retry logic is happening inside callable/task to avoid // synchronous waiting on getting the result. // Retry the failure task until reach the max retry number. // Wait configurable duration for next retry. - TException exception = null; - for (int i = 0; i < retryStrategyMaxRetries; i++) { - try { - return new CallResult(doTask()); - } catch (TException ex) { - LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + - " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + - ex.toString(), ex); - exception = ex; - + // + // Only thrift exceptions are retried. + // Other exceptions are propagated up the stack. + Exception exception = null; + try { + // We catch all exceptions except Thrift exceptions which are retried + for (int i = 0; i < retryStrategyMaxRetries; i++) { + //noinspection NestedTryStatement try { - Thread.sleep(retryStrategyWaitDurationMillis); - } catch (InterruptedException ignored) { - // Skip the rest retries if get InterruptedException. - // And set the corresponding retries number. - LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); - retries = i; - i = retryStrategyMaxRetries; + return new CallResult(doTask()); + } catch (TException ex) { + LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + + " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + + ex.toString(), ex); + exception = ex; + + try { + Thread.sleep(retryStrategyWaitDurationMillis); + } catch (InterruptedException ignored) { + // Skip the rest retries if get InterruptedException. + // And set the corresponding retries number. + LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); + break; + } } } - - retries = i; + } catch (Exception ex) { + exception = ex; } - - // Task fails, return the failure flag. - LOGGER.error("Task did not complete successfully after " + (retries + 1) - + " tries", exception); + LOGGER.error("Failed to execute task", exception); + // We will fail in the end, so we are shutting down the pool to prevent + // new tasks from being scheduled. + threadPool.shutdown(); return new CallResult(exception); } } @@ -291,6 +291,7 @@ public final class FullUpdateInitializer implements AutoCloseable { } @Override + @SuppressWarnings({"squid:S2629", "squid:S135"}) ObjectMapping doTask() throws TException { List<Table> tables = client.getTableObjectsByName(dbName, tableNames); if (LOGGER.isDebugEnabled()) { @@ -345,8 +346,7 @@ public final class FullUpdateInitializer implements AutoCloseable { ObjectMapping doTask() throws TException { Database db = client.getDatabase(dbName); if (!dbName.equalsIgnoreCase(db.getName())) { - LOGGER.warn(String.format("Database name %s does not match %s", - db.getName(), dbName)); + LOGGER.warn("Database name {} does not match {}", db.getName(), dbName); return emptyObjectMapping; } List<String> allTblStr = client.getAllTables(dbName); @@ -388,8 +388,9 @@ public final class FullUpdateInitializer implements AutoCloseable { * @throws ExecutionException if there was a scheduling error * @throws InterruptedException if processing was interrupted */ + @SuppressWarnings("squid:S00112") public Map<String, Set<String>> getFullHMSSnapshot() - throws TException, ExecutionException, InterruptedException { + throws Exception { // Get list of all HMS databases List<String> allDbStr = client.getAllDatabases(); // Schedule async task for each database responsible for fetching per-database @@ -410,7 +411,7 @@ public final class FullUpdateInitializer implements AutoCloseable { Future<CallResult> result = results.pop(); // Wait for the task to complete CallResult callResult = result.get(); - // Fail if we got Thrift errors + // Fail if we got errors if (!callResult.success()) { throw callResult.getFailure(); } @@ -438,6 +439,7 @@ public final class FullUpdateInitializer implements AutoCloseable { threadPool.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException ignored) { LOGGER.warn("Interrupted shutdown"); + Thread.currentThread().interrupt(); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/42b092a5/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index d410a6c..78dc0ac 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -54,6 +54,7 @@ import java.io.File; import java.io.IOException; import java.net.SocketException; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -273,14 +274,9 @@ public class HMSFollower implements Runnable, AutoCloseable { CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId(); LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore)); - try { - pathsFullSnapshot = fetchFullUpdate(); - if(pathsFullSnapshot.isEmpty()) { - LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data"); - return; - } - } catch (ExecutionException | InterruptedException ex) { - LOGGER.error("#### Encountered failure during fetching hive full snapshot !!", ex); + pathsFullSnapshot = fetchFullUpdate(); + if(pathsFullSnapshot.isEmpty()) { + LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data"); return; } @@ -385,12 +381,15 @@ public class HMSFollower implements Runnable, AutoCloseable { * @throws ExecutionException */ private Map<String, Set<String>> fetchFullUpdate() - throws InterruptedException, TException, ExecutionException { + throws TException, ExecutionException { LOGGER.info("Request full HMS snapshot"); try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) { Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); LOGGER.info("Obtained full HMS snapshot"); return pathsUpdate; + } catch (Exception ignored) { + // Caller will retry later + return Collections.emptyMap(); } }
