Repository: hive Updated Branches: refs/heads/master 519306439 -> d297b5108
HIVE-14204 : Optimize loading dynamic partitions (Rajesh Balamohan via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d297b510 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d297b510 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d297b510 Branch: refs/heads/master Commit: d297b51087da908c81aa7a04263a00b3420c4d70 Parents: 5193064 Author: Rajesh Balamohan <rbalamohan at apache dot org> Authored: Mon Jul 11 06:20:00 2016 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Thu Aug 4 14:00:39 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../hadoop/hive/metastore/ObjectStore.java | 3 +- .../apache/hadoop/hive/metastore/Warehouse.java | 1 - .../hadoop/hive/ql/lockmgr/DbLockManager.java | 7 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 52 +---- .../apache/hadoop/hive/ql/metadata/Hive.java | 210 +++++++++++++------ 6 files changed, 155 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9f5f619..7cc15e2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2482,6 +2482,9 @@ public class HiveConf extends Configuration { HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads" + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" + " MSCK to check tables."), + HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT("hive.load.dynamic.partitions.thread", 15, + new SizeValidator(1L, true, 1024L, true), + "Number of threads used to load dynamic partitions."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 5adfa02..83a3e39 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2270,7 +2270,8 @@ public class ObjectStore implements RawStore, Configurable { List<FieldSchema> partCols = table.getPartitionKeys(); int numPartKeys = partCols.size(); if (part_vals.size() > numPartKeys) { - throw new MetaException("Incorrect number of partition values"); + throw new MetaException("Incorrect number of partition values." + + " numPartKeys=" + numPartKeys + ", part_val=" + part_vals.size()); } partCols = partCols.subList(0, part_vals.size()); // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/... http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index d624d1b..6aca1b7 100755 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -32,7 +32,6 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index b4ae1d1..45ead16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,8 +26,6 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -54,11 +53,11 @@ public class DbLockManager implements HiveLockManager{ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set<DbHiveLock> locks; - private DbTxnManager.SynchronizedMetaStoreClient client; + private SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { + DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 02c17b5..a446999 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; @@ -26,7 +27,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.api.*; @@ -711,54 +711,4 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } } - - /** - * Synchronized MetaStoreClient wrapper - */ - final class SynchronizedMetaStoreClient { - private final IMetaStoreClient client; - SynchronizedMetaStoreClient(IMetaStoreClient client) { - this.client = client; - } - - synchronized long openTxn(String user) throws TException { - return client.openTxn(user); - } - - synchronized void commitTxn(long txnid) throws TException { - client.commitTxn(txnid); - } - - synchronized void rollbackTxn(long txnid) throws TException { - client.rollbackTxn(txnid); - } - - synchronized void heartbeat(long txnid, long lockid) throws TException { - client.heartbeat(txnid, lockid); - } - - synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { - return client.getValidTxns(currentTxn); - } - - synchronized LockResponse lock(LockRequest request) throws TException { - return client.lock(request); - } - - synchronized LockResponse checkLock(long lockid) throws TException { - return client.checkLock(lockid); - } - - synchronized void unlock(long lockid) throws TException { - client.unlock(lockid); - } - - synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { - return client.showLocks(showLocksRequest); - } - - synchronized void close() { - client.close(); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index deaaac4..8cb5e8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -55,6 +55,8 @@ import com.google.common.collect.ImmutableMap; import javax.jdo.JDODataStoreException; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -80,7 +82,6 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Database; @@ -126,6 +127,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; @@ -163,6 +165,7 @@ public class Hive { private HiveConf conf = null; private IMetaStoreClient metaStoreClient; + private SynchronizedMetaStoreClient syncMetaStoreClient; private UserGroupInformation owner; // metastore calls timing information @@ -1499,8 +1502,10 @@ public class Hive { Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + Path tblDataLocationPath = tbl.getDataLocation(); try { + Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes * check for existence of partition in metadata before accessing the data. @@ -1508,12 +1513,7 @@ public class Hive { * processes might move forward with partial data */ - Partition oldPart = getPartition(tbl, partSpec, false); - Path oldPartPath = null; - if(oldPart != null) { - oldPartPath = oldPart.getDataLocation(); - } - + Path oldPartPath = (oldPart != null) ? oldPart.getDataLocation() : null; Path newPartPath = null; if (inheritTableSpecs) { @@ -1585,7 +1585,8 @@ public class Hive { } MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); try { - getMSC().add_partition(newTPart.getTPartition()); + LOG.debug("Adding new partition " + newTPart.getSpec()); + getSychronizedMSC().add_partition(newTPart.getTPartition()); } catch (AlreadyExistsException aee) { // With multiple users concurrently issuing insert statements on the same partition has // a side effect that some queries may not see a partition at the time when they're issued, @@ -1620,14 +1621,15 @@ public class Hive { } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, - Partition newTPart) throws HiveException, InvalidOperationException { + Partition newTPart) throws MetaException, TException { EnvironmentContext environmentContext = null; if (hasFollowingStatsTask) { environmentContext = new EnvironmentContext(); environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition()), - environmentContext); + LOG.debug("Altering existing partition " + newTPart.getSpec()); + getSychronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), + newTPart.getTPartition(), environmentContext); } /** @@ -1712,6 +1714,43 @@ private void constructOneLBLocationMap(FileStatus fSta, return skewedColValueLocationMaps; } + /** + * Get the valid partitions from the path + * @param numDP number of dynamic partitions + * @param loadPath + * @return Set of valid partitions + * @throws HiveException + */ + private Set<Path> getValidPartitionsInPath(int numDP, Path loadPath) throws HiveException { + Set<Path> validPartitions = new HashSet<Path>(); + try { + FileSystem fs = loadPath.getFileSystem(conf); + FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); + // Check for empty partitions + for (FileStatus s : leafStatus) { + if (!s.isDirectory()) { + throw new HiveException("partition " + s.getPath() + " is not a directory!"); + } + validPartitions.add(s.getPath()); + } + } catch (IOException e) { + throw new HiveException(e); + } + + int partsToLoad = validPartitions.size(); + if (partsToLoad == 0) { + LOG.warn("No partition is generated by dynamic partitioning"); + } + + if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { + throw new HiveException("Number of dynamic partitions created is " + partsToLoad + + ", which is more than " + + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) + +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + " to at least " + partsToLoad + '.'); + } + return validPartitions; + } /** * Given a source directory name of the load path, load all dynamically generated partitions @@ -1728,67 +1767,97 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map<String, String> partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path loadPath, + final String tableName, final Map<String, String> partSpec, final boolean replace, + final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { - Set<Path> validPartitions = new HashSet<Path>(); - try { - Map<Map<String, String>, Partition> partitionsMap = new - LinkedHashMap<Map<String, String>, Partition>(); + final Map<Map<String, String>, Partition> partitionsMap = + Collections.synchronizedMap(new LinkedHashMap<Map<String, String>, Partition>()); - FileSystem fs = loadPath.getFileSystem(conf); - FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); - // Check for empty partitions - for (FileStatus s : leafStatus) { - validPartitions.add(s.getPath()); - } + int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("load-dynamic-partitions-%d") + .build()); - int partsToLoad = validPartitions.size(); - if (partsToLoad == 0) { - LOG.warn("No partition is generated by dynamic partitioning"); - } + // Get all valid partition paths and existing partitions for them (if any) + final Table tbl = getTable(tableName); + final Set<Path> validPartitions = getValidPartitionsInPath(numDP, loadPath); - if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { - throw new HiveException("Number of dynamic partitions created is " + partsToLoad - + ", which is more than " - + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) - +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + " to at least " + partsToLoad + '.'); - } + final int partsToLoad = validPartitions.size(); + final AtomicInteger partitionsLoaded = new AtomicInteger(0); + + final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + && InPlaceUpdates.inPlaceEligible(conf); + final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final SessionState parentSession = SessionState.get(); - Table tbl = getTable(tableName); + final List<Future<Void>> futures = Lists.newLinkedList(); + try { // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that - Iterator<Path> iter = validPartitions.iterator(); - LOG.info("Going to load " + partsToLoad + " partitions."); - PrintStream ps = null; - boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 - && InPlaceUpdates.inPlaceEligible(conf); - if(inPlaceEligible) { - ps = SessionState.getConsole().getInfoStream(); - } - int partitionsLoaded = 0; - while (iter.hasNext()) { - // get the dynamically created directory - Path partPath = iter.next(); - assert fs.getFileStatus(partPath).isDir(): - "partitions " + partPath + " is not a directory !"; - + for(final Path partPath : validPartitions) { // generate a full partition specification - LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec); + final LinkedHashMap<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); - partitionsMap.put(fullPartSpec, newPartition); - if (inPlaceEligible) { - InPlaceUpdates.rePositionCursor(ps); - InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); - } - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + // move file would require session details (needCopy() invokes SessionState.get) + SessionState.setCurrentSessionState(parentSession); + LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + + // load the partition + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, + replace, true, listBucketingEnabled, + false, isAcid, hasFollowingStatsTask); + partitionsMap.put(fullPartSpec, newPartition); + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdates.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + return null; + } catch (Exception t) { + LOG.error("Exception when loading partition with parameters " + + " partPath=" + partPath + ", " + + " table=" + tbl.getTableName() + ", " + + " partSpec=" + fullPartSpec + ", " + + " replace=" + replace + ", " + + " listBucketingEnabled=" + listBucketingEnabled + ", " + + " isAcid=" + isAcid + ", " + + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); + throw t; + } + } + })); + } + pool.shutdown(); + LOG.debug("Number of partitions to be added is " + futures.size()); + + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + //cancel other futures + for (Future future : futures) { + future.cancel(true); } + throw new HiveException("Exception when loading " + + partsToLoad + " in table " + tbl.getTableName() + + " with loadPath=" + loadPath, e); + } + + try { if (isAcid) { List<String> partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { @@ -1797,11 +1866,11 @@ private void constructOneLBLocationMap(FileStatus fSta, metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation)); } + LOG.info("Loaded " + partitionsMap.size() + " partitions"); return partitionsMap; - } catch (IOException e) { - throw new HiveException(e); } catch (TException te) { - throw new HiveException(te); + throw new HiveException("Exception updating metastore for acid table " + + tableName + " with partitions " + partitionsMap.values(), te); } } @@ -3395,6 +3464,19 @@ private void constructOneLBLocationMap(FileStatus fSta, } /** + * @return synchronized metastore client + * @throws MetaException + */ + @LimitedPrivate(value = {"Hive"}) + @Unstable + public synchronized SynchronizedMetaStoreClient getSychronizedMSC() throws MetaException { + if (syncMetaStoreClient == null) { + syncMetaStoreClient = new SynchronizedMetaStoreClient(getMSC(true, false)); + } + return syncMetaStoreClient; + } + + /** * @return the metastore client for the current thread * @throws MetaException */
