Repository: hive Updated Branches: refs/heads/master b4b821e0a -> 13bc529f4
HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/13bc529f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/13bc529f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/13bc529f Branch: refs/heads/master Commit: 13bc529f44318bf4cfe97c2391dca3d461dc9ec7 Parents: b4b821e Author: Peter Slawski <[email protected]> Authored: Wed Apr 13 19:54:00 2016 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Sun May 1 18:17:00 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/plan/TezWork.java | 9 ++- .../hive/ql/plan/TestTezWorkConcurrency.java | 65 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index c6ef829..7a70e6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -69,7 +70,7 @@ public class TezWork extends AbstractOperatorDesc { private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class); - private static int counter; + private static final AtomicInteger counter = new AtomicInteger(1); private final String dagId; private final String queryName; private final Set<BaseWork> roots = new HashSet<BaseWork>(); @@ -80,8 +81,12 @@ public class TezWork extends AbstractOperatorDesc { new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>(); private final Map<BaseWork, VertexType> workVertexTypeMap = new HashMap<BaseWork, VertexType>(); + public TezWork(String queryId) { + this(queryId, null); + } + public TezWork(String queryId, Configuration conf) { - this.dagId = queryId + ":" + (++counter); + this.dagId = queryId + ":" + counter.getAndIncrement(); String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null; if (queryName == null) { queryName = this.dagId; http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java new file mode 100644 index 0000000..c59fd10 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.ql.plan; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; + +public final class TestTezWorkConcurrency { + + @Test + public void ensureDagIdIsUnique() throws Exception { + final int threadCount = 5; + final CountDownLatch threadReadyToStartSignal = new CountDownLatch(threadCount); + final CountDownLatch startThreadSignal = new CountDownLatch(1); + final int numberOfTezWorkToCreatePerThread = 100; + + List<FutureTask<Set<String>>> tasks = Lists.newArrayList(); + for (int i = 0; i < threadCount; i++) { + tasks.add(new FutureTask<>(new Callable<Set<String>>() { + @Override + public Set<String> call() throws Exception { + threadReadyToStartSignal.countDown(); + startThreadSignal.await(); + return generateTezWorkDagIds(numberOfTezWorkToCreatePerThread); + } + })); + } + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (FutureTask<Set<String>> task : tasks) { + executor.execute(task); + } + threadReadyToStartSignal.await(); + startThreadSignal.countDown(); + Set<String> allTezWorkDagIds = getAllTezWorkDagIds(tasks); + assertEquals(threadCount * numberOfTezWorkToCreatePerThread, allTezWorkDagIds.size()); + } + + private static Set<String> generateTezWorkDagIds(int numberOfNames) { + Set<String> tezWorkIds = Sets.newHashSet(); + for (int i = 0; i < numberOfNames; i++) { + TezWork work = new TezWork("query-id"); + tezWorkIds.add(work.getDagId()); + } + return tezWorkIds; + } + + private static Set<String> getAllTezWorkDagIds(List<FutureTask<Set<String>>> tasks) + throws ExecutionException, InterruptedException { + Set<String> allTezWorkDagIds = Sets.newHashSet(); + for (FutureTask<Set<String>> task : tasks) { + allTezWorkDagIds.addAll(task.get()); + } + return allTezWorkDagIds; + } +}
