HIVE-15250: Reuse partitions info generated in MoveTask to its subscribers (StatsTask) (Rajesh Balamohan reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/97c3fb39 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/97c3fb39 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/97c3fb39 Branch: refs/heads/master Commit: 97c3fb396b9d8e590f8b8fb3c8179be155137085 Parents: b91787d Author: Prasanth Jayachandran <[email protected]> Authored: Tue Nov 29 21:54:46 2016 -0800 Committer: Prasanth Jayachandran <[email protected]> Committed: Tue Nov 29 21:54:46 2016 -0800 ---------------------------------------------------------------------- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java | 9 +++++---- .../java/org/apache/hadoop/hive/ql/exec/StatsTask.java | 11 ++++------- 2 files changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/97c3fb39/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 349f115..e1381be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -423,10 +423,6 @@ public class MoveTask extends Task<MoveWork> implements Serializable { List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx); - // publish DP columns to its subscribers - if (dps != null && dps.size() > 0) { - pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); - } console.printInfo(System.getProperty("line.separator")); long startTime = System.currentTimeMillis(); // load the list of DP partitions and return the list of partition specs @@ -448,6 +444,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable { SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType()); + // publish DP columns to its subscribers + if (dps != null && dps.size() > 0) { + pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values()); + } + String loadTime = "\t Time taken to load dynamic partitions: " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"; console.printInfo(loadTime); http://git-wip-us.apache.org/repos/asf/hive/blob/97c3fb39/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index aa5d914..c22d69b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -78,7 +79,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { private static transient final Logger LOG = LoggerFactory.getLogger(StatsTask.class); private Table table; - private List<LinkedHashMap<String, String>> dpPartSpecs; + private Collection<Partition> dpPartSpecs; public StatsTask() { super(); @@ -89,8 +90,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { protected void receiveFeed(FeedType feedType, Object feedValue) { // this method should be called by MoveTask when there are dynamic partitions generated if (feedType == FeedType.DYNAMIC_PARTITIONS) { - assert feedValue instanceof List<?>; - dpPartSpecs = (List<LinkedHashMap<String, String>>) feedValue; + dpPartSpecs = (Collection<Partition>) feedValue; } } @@ -496,10 +496,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { // If no dynamic partitions are generated, dpPartSpecs may not be initialized if (dpPartSpecs != null) { // load the list of DP partitions and return the list of partition specs - for (LinkedHashMap<String, String> partSpec : dpPartSpecs) { - Partition partn = db.getPartition(table, partSpec, false); - list.add(partn); - } + list.addAll(dpPartSpecs); } } else { // static partition Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
