Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 64b175ddb -> 57ded0a9e
SENTRY-1684: FullUpdateInitializer has a race condition in handling results list (Alex Kolbasov, reviewed by Hao Hao) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/57ded0a9 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/57ded0a9 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/57ded0a9 Branch: refs/heads/sentry-ha-redesign Commit: 57ded0a9e8eba50192b09813bfebcbb3ea1d0508 Parents: 64b175d Author: Alexander Kolbasov <[email protected]> Authored: Thu Apr 6 17:03:18 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Thu Apr 6 17:03:18 2017 -0700 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 45 ++++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/57ded0a9/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index 7be63ea..90aaaef 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -18,7 +18,6 @@ package org.apache.sentry.hdfs; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -29,11 +28,14 @@ import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Map; import java.util.HashMap; +import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -50,7 +52,7 @@ public final class FullUpdateInitializer implements AutoCloseable { private final HiveMetaStoreClient client; private final int maxPartitionsPerCall; private final int maxTablesPerCall; - private final List<Future<CallResult>> results = new ArrayList<Future<CallResult>>(); + private final Collection<Future<CallResult>> results = new Vector<>(); private final AtomicInteger taskCounter = new AtomicInteger(0); private final int maxRetries; private final int waitDurationMillis; @@ -212,8 +214,8 @@ public final class FullUpdateInitializer implements AutoCloseable { TPathChanges tblPathChange; // Table names are case insensitive String tableName = tbl.getTableName().toLowerCase(); + Preconditions.checkArgument(tbl.getDbName().equalsIgnoreCase(db.getName())); synchronized (update) { - Preconditions.checkArgument(tbl.getDbName().equalsIgnoreCase(db.getName())); tblPathChange = update.newPathChange(db.getName() + "." + tableName); } if (tbl.getSd().getLocation() != null) { @@ -230,9 +232,7 @@ public final class FullUpdateInitializer implements AutoCloseable { Callable<CallResult> partTask = new PartitionTask(db.getName(), tableName, partsToFetch, tblPathChange); - synchronized (results) { - results.add(threadPool.submit(partTask)); - } + results.add(threadPool.submit(partTask)); } } } @@ -268,9 +268,7 @@ public final class FullUpdateInitializer implements AutoCloseable { i + maxTablesPerCall, allTblStr.size())); Callable<CallResult> tableTask = new TableTask(db, tablesToFetch, update); - synchronized (results) { - results.add(threadPool.submit(tableTask)); - } + results.add(threadPool.submit(tableTask)); } } } @@ -305,9 +303,9 @@ public final class FullUpdateInitializer implements AutoCloseable { results.add(threadPool.submit(dbTask)); } - while (taskCounter.get() > 0) { - Thread.sleep(1000); + while (taskCounter.get() != 0) { // Wait until no more tasks remain + Thread.sleep(250); } for (Future<CallResult> result : results) { @@ -334,19 +332,20 @@ public final class FullUpdateInitializer implements AutoCloseable { * @return mapping of hiveObj -> [Paths]. */ private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) { - Map<String, Set<String>> authzObjToPath = new HashMap<>(); List<TPathChanges> tPathChanges = pathsUpdate.getPathChanges(); - - if (!tPathChanges.isEmpty()) { - for (TPathChanges pathChanges : tPathChanges) { - // Only processes TPathChanges.addPaths - List<List<String>> addPaths = pathChanges.getAddPaths(); - Set<String> paths = Sets.newHashSet(); - for (List<String> addPath : addPaths) { - paths.add(PathsUpdate.cancatePath(addPath)); - } - authzObjToPath.put(pathChanges.getAuthzObj(), paths); + if (tPathChanges.isEmpty()) { + return Collections.emptyMap(); + } + Map<String, Set<String>> authzObjToPath = new HashMap<>(tPathChanges.size()); + + for (TPathChanges pathChanges : tPathChanges) { + // Only processes TPathChanges.addPaths + List<List<String>> addPaths = pathChanges.getAddPaths(); + Set<String> paths = new HashSet<>(addPaths.size()); + for (List<String> addPath : addPaths) { + paths.add(PathsUpdate.cancatePath(addPath)); } + authzObjToPath.put(pathChanges.getAuthzObj(), paths); } return authzObjToPath;
