Repository: hive Updated Branches: refs/heads/master 204a32af2 -> 4ee6e8e0c
HIVE-17585 : Improve thread safety when loading dynamic partitions in parallel (Tao Li via Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ee6e8e0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ee6e8e0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ee6e8e0 Branch: refs/heads/master Commit: 4ee6e8e0c4779e3e9c91fd019c7609e4e21d0315 Parents: 204a32a Author: Thejas M Nair <[email protected]> Authored: Sat Sep 23 18:22:06 2017 -0700 Committer: Thejas M Nair <[email protected]> Committed: Sat Sep 23 18:22:06 2017 -0700 ---------------------------------------------------------------------- .../metastore/SynchronizedMetaStoreClient.java | 24 ++++++++++++++++++++ .../apache/hadoop/hive/ql/metadata/Hive.java | 20 ++++++++-------- 2 files changed, 34 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4ee6e8e0/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index f5d2c76..c028631 100644 --- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -18,13 +18,22 @@ package org.apache.hadoop.hive.metastore; +import java.util.List; + import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.thrift.TException; @@ -84,6 +93,21 @@ public final class SynchronizedMetaStoreClient { return client.showLocks(showLocksRequest); } + public synchronized Partition getPartitionWithAuthInfo(String dbName, String tableName, + List<String> pvals, String userName, List<String> groupNames) + throws MetaException, UnknownTableException, NoSuchObjectException, TException { + return client.getPartitionWithAuthInfo(dbName, tableName, pvals, userName, groupNames); + } + + public synchronized Partition appendPartition(String db_name, String table_name, List<String> part_vals) + throws InvalidObjectException, AlreadyExistsException, MetaException, TException { + return client.appendPartition(db_name, table_name, part_vals); + } + + public synchronized FireEventResponse fireListenerEvent(FireEventRequest rqst) throws TException { + return client.fireListenerEvent(rqst); + } + public synchronized void close() { client.close(); } http://git-wip-us.apache.org/repos/asf/hive/blob/4ee6e8e0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0f59917..e8a0474 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -707,7 +707,7 @@ public class Hive { location = Utilities.getQualifiedPath(conf, new Path(location)); newPart.setLocation(location); } - getMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext); + getSynchronizedMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext); } catch (MetaException e) { throw new HiveException("Unable to alter partition. " + e.getMessage(), e); @@ -1767,7 +1767,7 @@ public class Hive { MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); try { LOG.debug("Adding new partition " + newTPart.getSpec()); - getSychronizedMSC().add_partition(newTPart.getTPartition()); + getSynchronizedMSC().add_partition(newTPart.getTPartition()); } catch (AlreadyExistsException aee) { // With multiple users concurrently issuing insert statements on the same partition has // a side effect that some queries may not see a partition at the time when they're issued, @@ -1819,7 +1819,7 @@ public class Hive { environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } LOG.debug("Altering existing partition " + newTPart.getSpec()); - getSychronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), + getSynchronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), newTPart.getTPartition(), environmentContext); } @@ -1990,7 +1990,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that - final Map<Long, RawStore> rawStoreMap = new HashMap<Long, RawStore>(); + final Map<Long, RawStore> rawStoreMap = Collections.synchronizedMap(new HashMap<Long, RawStore>()); for(final Path partPath : validPartitions) { // generate a full partition specification final LinkedHashMap<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec); @@ -2332,7 +2332,7 @@ private void constructOneLBLocationMap(FileStatus fSta, } org.apache.hadoop.hive.metastore.api.Partition tpart = null; try { - tpart = getMSC().getPartitionWithAuthInfo(tbl.getDbName(), + tpart = getSynchronizedMSC().getPartitionWithAuthInfo(tbl.getDbName(), tbl.getTableName(), pvals, getUserName(), getGroupNames()); } catch (NoSuchObjectException nsoe) { // this means no partition exists for the given partition @@ -2349,10 +2349,10 @@ private void constructOneLBLocationMap(FileStatus fSta, LOG.debug("creating partition for table " + tbl.getTableName() + " with partition spec : " + partSpec); try { - tpart = getMSC().appendPartition(tbl.getDbName(), tbl.getTableName(), pvals); + tpart = getSynchronizedMSC().appendPartition(tbl.getDbName(), tbl.getTableName(), pvals); } catch (AlreadyExistsException aee) { LOG.debug("Caught already exists exception, trying to alter partition instead"); - tpart = getMSC().getPartitionWithAuthInfo(tbl.getDbName(), + tpart = getSynchronizedMSC().getPartitionWithAuthInfo(tbl.getDbName(), tbl.getTableName(), pvals, getUserName(), getGroupNames()); alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath); } catch (Exception e) { @@ -2361,7 +2361,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // have to be used here. This helps avoid adding jdo dependency for // hcatalog client uses LOG.debug("Caught JDO exception, trying to alter partition instead"); - tpart = getMSC().getPartitionWithAuthInfo(tbl.getDbName(), + tpart = getSynchronizedMSC().getPartitionWithAuthInfo(tbl.getDbName(), tbl.getTableName(), pvals, getUserName(), getGroupNames()); if (tpart == null) { // This means the exception was caused by something other than a race condition @@ -2467,7 +2467,7 @@ private void constructOneLBLocationMap(FileStatus fSta, } rqst.setPartitionVals(partVals); } - getMSC().fireListenerEvent(rqst); + getSynchronizedMSC().fireListenerEvent(rqst); } catch (IOException | TException e) { throw new HiveException(e); } @@ -3842,7 +3842,7 @@ private void constructOneLBLocationMap(FileStatus fSta, */ @LimitedPrivate(value = {"Hive"}) @Unstable - public synchronized SynchronizedMetaStoreClient getSychronizedMSC() throws MetaException { + public synchronized SynchronizedMetaStoreClient getSynchronizedMSC() throws MetaException { if (syncMetaStoreClient == null) { syncMetaStoreClient = new SynchronizedMetaStoreClient(getMSC(true, false)); }
