Repository: hive
Updated Branches:
  refs/heads/master 519306439 -> d297b5108


HIVE-14204 : Optimize loading dynamic partitions (Rajesh Balamohan via Ashutosh 
Chauhan)

Signed-off-by: Ashutosh Chauhan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d297b510
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d297b510
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d297b510

Branch: refs/heads/master
Commit: d297b51087da908c81aa7a04263a00b3420c4d70
Parents: 5193064
Author: Rajesh Balamohan <rbalamohan at apache dot org>
Authored: Mon Jul 11 06:20:00 2016 -0800
Committer: Ashutosh Chauhan <[email protected]>
Committed: Thu Aug 4 14:00:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hadoop/hive/metastore/ObjectStore.java      |   3 +-
 .../apache/hadoop/hive/metastore/Warehouse.java |   1 -
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |   7 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  52 +----
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 210 +++++++++++++------
 6 files changed, 155 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9f5f619..7cc15e2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2482,6 +2482,9 @@ public class HiveConf extends Configuration {
     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new  
SizeValidator(0L, true, 1024L, true), "Number of threads"
          + " used to move files in move task. Set it to 0 to disable 
multi-threaded file moves. This parameter is also used by"
          + " MSCK to check tables."),
+    
HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT("hive.load.dynamic.partitions.thread",
 15,
+        new  SizeValidator(1L, true, 1024L, true),
+        "Number of threads used to load dynamic partitions."),
     // If this is set all move tasks at the end of a multi-insert query will 
only begin once all
     // outputs are ready
     HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES(

http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 5adfa02..83a3e39 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -2270,7 +2270,8 @@ public class ObjectStore implements RawStore, 
Configurable {
     List<FieldSchema> partCols = table.getPartitionKeys();
     int numPartKeys = partCols.size();
     if (part_vals.size() > numPartKeys) {
-      throw new MetaException("Incorrect number of partition values");
+      throw new MetaException("Incorrect number of partition values."
+          + " numPartKeys=" + numPartKeys + ", part_val=" + part_vals.size());
     }
     partCols = partCols.subList(0, part_vals.size());
     // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/...

http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
index d624d1b..6aca1b7 100755
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -32,7 +32,6 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index b4ae1d1..45ead16 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,8 +26,6 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.thrift.TException;
@@ -54,11 +53,11 @@ public class DbLockManager implements HiveLockManager{
   private long MAX_SLEEP;
   //longer term we should always have a txn id and then we won't need to track 
locks here at all
   private Set<DbHiveLock> locks;
-  private DbTxnManager.SynchronizedMetaStoreClient client;
+  private SynchronizedMetaStoreClient client;
   private long nextSleep = 50;
   private final HiveConf conf;
 
-  DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf 
conf) {
+  DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) {
     locks = new HashSet<>();
     this.client = client;
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 02c17b5..a446999 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
@@ -26,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.*;
@@ -711,54 +711,4 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       }
     }
   }
-
-  /**
-   * Synchronized MetaStoreClient wrapper
-   */
-  final class SynchronizedMetaStoreClient {
-    private final IMetaStoreClient client;
-    SynchronizedMetaStoreClient(IMetaStoreClient client) {
-      this.client = client;
-    }
-
-    synchronized long openTxn(String user) throws TException {
-      return client.openTxn(user);
-    }
-
-    synchronized void commitTxn(long txnid) throws TException {
-      client.commitTxn(txnid);
-    }
-
-    synchronized void rollbackTxn(long txnid) throws TException {
-      client.rollbackTxn(txnid);
-    }
-
-    synchronized void heartbeat(long txnid, long lockid) throws TException {
-      client.heartbeat(txnid, lockid);
-    }
-
-    synchronized ValidTxnList getValidTxns(long currentTxn) throws TException {
-      return client.getValidTxns(currentTxn);
-    }
-
-    synchronized LockResponse lock(LockRequest request) throws TException {
-      return client.lock(request);
-    }
-
-    synchronized LockResponse checkLock(long lockid) throws TException {
-      return client.checkLock(lockid);
-    }
-
-    synchronized void unlock(long lockid) throws TException {
-      client.unlock(lockid);
-    }
-
-    synchronized ShowLocksResponse showLocks(ShowLocksRequest 
showLocksRequest) throws TException {
-      return client.showLocks(showLocksRequest);
-    }
-
-    synchronized void close() {
-      client.close();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d297b510/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index deaaac4..8cb5e8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -55,6 +55,8 @@ import com.google.common.collect.ImmutableMap;
 
 import javax.jdo.JDODataStoreException;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -80,7 +82,6 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -126,6 +127,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import 
org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -163,6 +165,7 @@ public class Hive {
 
   private HiveConf conf = null;
   private IMetaStoreClient metaStoreClient;
+  private SynchronizedMetaStoreClient syncMetaStoreClient;
   private UserGroupInformation owner;
 
   // metastore calls timing information
@@ -1499,8 +1502,10 @@ public class Hive {
       Map<String, String> partSpec, boolean replace,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) 
throws HiveException {
+
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
+      Partition oldPart = getPartition(tbl, partSpec, false);
       /**
        * Move files before creating the partition since down stream processes
        * check for existence of partition in metadata before accessing the 
data.
@@ -1508,12 +1513,7 @@ public class Hive {
        * processes might move forward with partial data
        */
 
-      Partition oldPart = getPartition(tbl, partSpec, false);
-      Path oldPartPath = null;
-      if(oldPart != null) {
-        oldPartPath = oldPart.getDataLocation();
-      }
-
+      Path oldPartPath = (oldPart != null) ? oldPart.getDataLocation() : null;
       Path newPartPath = null;
 
       if (inheritTableSpecs) {
@@ -1585,7 +1585,8 @@ public class Hive {
         }
         
MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath,
 -1, newPartPath.getFileSystem(conf)), newTPart.getParameters());
         try {
-          getMSC().add_partition(newTPart.getTPartition());
+          LOG.debug("Adding new partition " + newTPart.getSpec());
+          getSychronizedMSC().add_partition(newTPart.getTPartition());
         } catch (AlreadyExistsException aee) {
           // With multiple users concurrently issuing insert statements on the 
same partition has
           // a side effect that some queries may not see a partition at the 
time when they're issued,
@@ -1620,14 +1621,15 @@ public class Hive {
   }
 
   private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, 
Table tbl,
-      Partition newTPart) throws HiveException, InvalidOperationException {
+      Partition newTPart) throws MetaException, TException {
     EnvironmentContext environmentContext = null;
     if (hasFollowingStatsTask) {
       environmentContext = new EnvironmentContext();
       environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, 
StatsSetupConst.TRUE);
     }
-    alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, 
newTPart.getTPartition()),
-        environmentContext);
+    LOG.debug("Altering existing partition " + newTPart.getSpec());
+    getSychronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(),
+      newTPart.getTPartition(), environmentContext);
   }
 
   /**
@@ -1712,6 +1714,43 @@ private void constructOneLBLocationMap(FileStatus fSta,
     return skewedColValueLocationMaps;
   }
 
+  /**
+   * Get the valid partitions from the path
+   * @param numDP number of dynamic partitions
+   * @param loadPath
+   * @return Set of valid partitions
+   * @throws HiveException
+   */
+  private Set<Path> getValidPartitionsInPath(int numDP, Path loadPath) throws 
HiveException {
+    Set<Path> validPartitions = new HashSet<Path>();
+    try {
+      FileSystem fs = loadPath.getFileSystem(conf);
+      FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, 
numDP, fs);
+      // Check for empty partitions
+      for (FileStatus s : leafStatus) {
+        if (!s.isDirectory()) {
+          throw new HiveException("partition " + s.getPath() + " is not a 
directory!");
+        }
+        validPartitions.add(s.getPath());
+      }
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+
+    int partsToLoad = validPartitions.size();
+    if (partsToLoad == 0) {
+      LOG.warn("No partition is generated by dynamic partitioning");
+    }
+
+    if (partsToLoad > 
conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
+      throw new HiveException("Number of dynamic partitions created is " + 
partsToLoad
+          + ", which is more than "
+          + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
+          +". To solve this try to set " + 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+          + " to at least " + partsToLoad + '.');
+    }
+    return validPartitions;
+  }
 
   /**
    * Given a source directory name of the load path, load all dynamically 
generated partitions
@@ -1728,67 +1767,97 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
-  public Map<Map<String, String>, Partition> loadDynamicPartitions(Path 
loadPath,
-      String tableName, Map<String, String> partSpec, boolean replace,
-      int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, 
boolean hasFollowingStatsTask,
-      AcidUtils.Operation operation)
+  public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path 
loadPath,
+      final String tableName, final Map<String, String> partSpec, final 
boolean replace,
+      final int numDP, final boolean listBucketingEnabled, final boolean 
isAcid, final long txnId,
+      final boolean hasFollowingStatsTask, final AcidUtils.Operation operation)
       throws HiveException {
 
-    Set<Path> validPartitions = new HashSet<Path>();
-    try {
-      Map<Map<String, String>, Partition> partitionsMap = new
-          LinkedHashMap<Map<String, String>, Partition>();
+    final Map<Map<String, String>, Partition> partitionsMap =
+        Collections.synchronizedMap(new LinkedHashMap<Map<String, String>, 
Partition>());
 
-      FileSystem fs = loadPath.getFileSystem(conf);
-      FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, 
numDP, fs);
-      // Check for empty partitions
-      for (FileStatus s : leafStatus) {
-        validPartitions.add(s.getPath());
-      }
+    int poolSize = 
conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1);
+    final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("load-dynamic-partitions-%d")
+                .build());
 
-      int partsToLoad = validPartitions.size();
-      if (partsToLoad == 0) {
-        LOG.warn("No partition is generated by dynamic partitioning");
-      }
+    // Get all valid partition paths and existing partitions for them (if any)
+    final Table tbl = getTable(tableName);
+    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, 
loadPath);
 
-      if (partsToLoad > 
conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
-        throw new HiveException("Number of dynamic partitions created is " + 
partsToLoad
-            + ", which is more than "
-            + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
-            +". To solve this try to set " + 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-            + " to at least " + partsToLoad + '.');
-      }
+    final int partsToLoad = validPartitions.size();
+    final AtomicInteger partitionsLoaded = new AtomicInteger(0);
+
+    final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
+        && InPlaceUpdates.inPlaceEligible(conf);
+    final PrintStream ps = (inPlaceEligible) ? 
SessionState.getConsole().getInfoStream() : null;
+    final SessionState parentSession = SessionState.get();
 
-      Table tbl = getTable(tableName);
+    final List<Future<Void>> futures = Lists.newLinkedList();
+    try {
       // for each dynamically created DP directory, construct a full partition 
spec
       // and load the partition based on that
-      Iterator<Path> iter = validPartitions.iterator();
-      LOG.info("Going to load " + partsToLoad + " partitions.");
-      PrintStream ps = null;
-      boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
-          && InPlaceUpdates.inPlaceEligible(conf);
-      if(inPlaceEligible) {
-        ps = SessionState.getConsole().getInfoStream();
-      }
-      int partitionsLoaded = 0;
-      while (iter.hasNext()) {
-        // get the dynamically created directory
-        Path partPath = iter.next();
-        assert fs.getFileStatus(partPath).isDir():
-          "partitions " + partPath + " is not a directory !";
-
+      for(final Path partPath : validPartitions) {
         // generate a full partition specification
-        LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, 
String>(partSpec);
+        final LinkedHashMap<String, String> fullPartSpec = 
Maps.newLinkedHashMap(partSpec);
         Warehouse.makeSpecFromName(fullPartSpec, partPath);
-        Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, 
replace,
-            true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask);
-        partitionsMap.put(fullPartSpec, newPartition);
-        if (inPlaceEligible) {
-          InPlaceUpdates.rePositionCursor(ps);
-          InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + 
"/" + partsToLoad +" partitions.");
-        }
-        LOG.info("New loading path = " + partPath + " with partSpec " + 
fullPartSpec);
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            try {
+              // move file would require session details (needCopy() invokes 
SessionState.get)
+              SessionState.setCurrentSessionState(parentSession);
+              LOG.info("New loading path = " + partPath + " with partSpec " + 
fullPartSpec);
+
+              // load the partition
+              Partition newPartition = loadPartition(partPath, tbl, 
fullPartSpec,
+                  replace, true, listBucketingEnabled,
+                  false, isAcid, hasFollowingStatsTask);
+              partitionsMap.put(fullPartSpec, newPartition);
+
+              if (inPlaceEligible) {
+                synchronized (ps) {
+                  InPlaceUpdates.rePositionCursor(ps);
+                  partitionsLoaded.incrementAndGet();
+                  InPlaceUpdates.reprintLine(ps, "Loaded : " + 
partitionsLoaded.get() + "/"
+                      + partsToLoad + " partitions.");
+                }
+              }
+              return null;
+            } catch (Exception t) {
+              LOG.error("Exception when loading partition with parameters "
+                  + " partPath=" + partPath + ", "
+                  + " table=" + tbl.getTableName() + ", "
+                  + " partSpec=" + fullPartSpec + ", "
+                  + " replace=" + replace + ", "
+                  + " listBucketingEnabled=" + listBucketingEnabled + ", "
+                  + " isAcid=" + isAcid + ", "
+                  + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
+              throw t;
+            }
+          }
+        }));
+      }
+      pool.shutdown();
+      LOG.debug("Number of partitions to be added is " + futures.size());
+
+      for (Future future : futures) {
+        future.get();
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
+      //cancel other futures
+      for (Future future : futures) {
+        future.cancel(true);
       }
+      throw new HiveException("Exception when loading "
+          + partsToLoad + " in table " + tbl.getTableName()
+          + " with loadPath=" + loadPath, e);
+    }
+
+    try {
       if (isAcid) {
         List<String> partNames = new ArrayList<>(partitionsMap.size());
         for (Partition p : partitionsMap.values()) {
@@ -1797,11 +1866,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
         metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), 
tbl.getTableName(),
           partNames, AcidUtils.toDataOperationType(operation));
       }
+      LOG.info("Loaded " + partitionsMap.size() + " partitions");
       return partitionsMap;
-    } catch (IOException e) {
-      throw new HiveException(e);
     } catch (TException te) {
-      throw new HiveException(te);
+      throw new HiveException("Exception updating metastore for acid table "
+          + tableName + " with partitions " + partitionsMap.values(), te);
     }
   }
 
@@ -3395,6 +3464,19 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   /**
+   * @return synchronized metastore client
+   * @throws MetaException
+   */
+  @LimitedPrivate(value = {"Hive"})
+  @Unstable
+  public synchronized SynchronizedMetaStoreClient getSychronizedMSC() throws 
MetaException {
+    if (syncMetaStoreClient == null) {
+      syncMetaStoreClient = new SynchronizedMetaStoreClient(getMSC(true, 
false));
+    }
+    return syncMetaStoreClient;
+  }
+
+  /**
    * @return the metastore client for the current thread
    * @throws MetaException
    */

Reply via email to