This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 202906022b4 HIVE-27530: Implement direct SQL for alter partitions to 
improve performance (Wechar Yu, reviewed by Denys Kuzmenko, Sai Hemanth 
Gantasala)
202906022b4 is described below

commit 202906022b450ab2e90928f514f6cad39830ad1e
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Dec 25 19:24:04 2023 +0800

    HIVE-27530: Implement direct SQL for alter partitions to improve 
performance (Wechar Yu, reviewed by Denys Kuzmenko, Sai Hemanth Gantasala)
    
    Closes #4517
---
 .../apache/hadoop/hive/metastore/Batchable.java    |   30 +-
 .../hadoop/hive/metastore/DatabaseProduct.java     |   16 +
 .../hadoop/hive/metastore/DirectSqlInsertPart.java |    8 +-
 .../hadoop/hive/metastore/DirectSqlUpdatePart.java | 1536 ++++++++++++++++++++
 .../hadoop/hive/metastore/DirectSqlUpdateStat.java |  727 ---------
 .../apache/hadoop/hive/metastore/HMSHandler.java   |   18 +-
 .../hadoop/hive/metastore/MetaStoreDirectSql.java  |  127 +-
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  145 +-
 .../apache/hadoop/hive/metastore/txn/TxnUtils.java |   16 +
 .../hadoop/hive/metastore/TestObjectStore.java     |    2 +-
 .../hadoop/hive/metastore/tools/BenchmarkTool.java |    3 +
 .../hadoop/hive/metastore/tools/HMSBenchmarks.java |   30 +
 12 files changed, 1817 insertions(+), 841 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
index e3fd5a4bf14..571d6bdbd1d 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import javax.jdo.Query;
 
@@ -38,7 +39,7 @@ public abstract class Batchable<I, R> {
   public static final int NO_BATCHING = -1;
 
   private List<Query> queries = null;
-  public abstract List<R> run(List<I> input) throws MetaException;
+  public abstract List<R> run(List<I> input) throws Exception;
 
   public void addQueryAfterUse(Query query) {
     if (queries == null) {
@@ -70,18 +71,25 @@ public abstract class Batchable<I, R> {
       final int batchSize,
       List<I> input,
       Batchable<I, R> runnable) throws MetaException {
-    if (batchSize == NO_BATCHING || batchSize >= input.size()) {
-      return runnable.run(input);
+    if (input == null || input.isEmpty()) {
+      return Collections.emptyList();
     }
-    List<R> result = new ArrayList<R>(input.size());
-    for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex = 
toIndex) {
-      toIndex = Math.min(fromIndex + batchSize, input.size());
-      List<I> batchedInput = input.subList(fromIndex, toIndex);
-      List<R> batchedOutput = runnable.run(batchedInput);
-      if (batchedOutput != null) {
-        result.addAll(batchedOutput);
+    try {
+      if (batchSize == NO_BATCHING || batchSize >= input.size()) {
+        return runnable.run(input);
       }
+      List<R> result = new ArrayList<>(input.size());
+      for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex = 
toIndex) {
+        toIndex = Math.min(fromIndex + batchSize, input.size());
+        List<I> batchedInput = input.subList(fromIndex, toIndex);
+        List<R> batchedOutput = runnable.run(batchedInput);
+        if (batchedOutput != null) {
+          result.addAll(batchedOutput);
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      throw ExceptionHandler.newMetaException(e);
     }
-    return result;
   }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 6e04bf0d6f7..ea3faf09113 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configurable;
@@ -748,6 +749,21 @@ public class DatabaseProduct implements Configurable {
     return val;
   }
 
+  /**
+   * Get the max rows in a query with paramSize.
+   * @param batch the configured batch size
+   * @param paramSize the parameter size in a query statement
+   * @return the max allowed rows in a query
+   */
+  public int getMaxRows(int batch, int paramSize) {
+    if (isSQLSERVER()) {
+      // SQL Server supports a maximum of 2100 parameters in a request. Adjust 
the maxRowsInBatch accordingly
+      int maxAllowedRows = (2100 - paramSize) / paramSize;
+      return Math.min(batch, maxAllowedRows);
+    }
+    return batch;
+  }
+
   // This class implements the Configurable interface for the benefit
   // of "plugin" instances created via reflection (see invocation of
   // ReflectionUtils.newInstance in method determineDatabaseProduct)
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
index be17470edd6..ba205ebe705 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
@@ -83,13 +83,7 @@ class DirectSqlInsertPart {
       return;
     }
     int maxRowsInBatch = batchSize > 0 ? batchSize : rowCount;
-    if (dbType.isSQLSERVER()) {
-      // SQL Server supports a maximum of 2100 parameters in a request. Adjust 
the maxRowsInBatch accordingly
-      int maxAllowedRows = (2100 - columnCount) / columnCount;
-      if (maxRowsInBatch > maxAllowedRows) {
-        maxRowsInBatch = maxAllowedRows;
-      }
-    }
+    maxRowsInBatch = dbType.getMaxRows(maxRowsInBatch, columnCount);
     int maxBatches = rowCount / maxRowsInBatch;
     int last = rowCount % maxRowsInBatch;
     String rowFormat = "(" + repeat(",?", columnCount).substring(1) + ")";
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
new file mode 100644
index 00000000000..f6e41f09094
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -0,0 +1,1536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import 
org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEventBatch;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
+import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
+import org.apache.hadoop.hive.metastore.model.MStringList;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.datanucleus.ExecutionContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.IdentityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.datastore.JDOConnection;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.executeWithArray;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlInt;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlLong;
+
+/**
+ * This class contains the optimizations for MetaStore that rely on direct SQL 
access to
+ * the underlying database. It should use ANSI SQL and be compatible with 
common databases
+ * such as MySQL (note that MySQL doesn't use full ANSI mode by default), 
Postgres, etc.
+ *
+ * This class separates out the update part from MetaStoreDirectSql class.
+ */
+class DirectSqlUpdatePart {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DirectSqlUpdatePart.class.getName());
+
+  private final PersistenceManager pm;
+  private final Configuration conf;
+  private final DatabaseProduct dbType;
+  private final int maxBatchSize;
+  private final SQLGenerator sqlGenerator;
+
+  private static final ReentrantLock derbyLock = new ReentrantLock(true);
+
+  public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
+                             DatabaseProduct dbType, int batchSize) {
+    this.pm = pm;
+    this.conf = conf;
+    this.dbType = dbType;
+    this.maxBatchSize = batchSize;
+    sqlGenerator = new SQLGenerator(dbType, conf);
+  }
+
+  /**
+   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
+   * Select ... For Update to sequence operations properly.  In practice that 
means when running
+   * with Derby database.  See more notes at class level.
+   */
+  private void lockInternal() {
+    if(dbType.isDERBY()) {
+      derbyLock.lock();
+    }
+  }
+
+  private void unlockInternal() {
+    if(dbType.isDERBY()) {
+      derbyLock.unlock();
+    }
+  }
+
+  void rollbackDBConn(Connection dbConn) {
+    try {
+      if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
+    } catch (SQLException e) {
+      LOG.warn("Failed to rollback db connection ", e);
+    }
+  }
+
+  void closeDbConn(JDOConnection jdoConn) {
+    try {
+      if (jdoConn != null) {
+        jdoConn.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to close db connection", e);
+    }
+  }
+
+  void closeStmt(Statement stmt) {
+    try {
+      if (stmt != null && !stmt.isClosed()) stmt.close();
+    } catch (SQLException e) {
+      LOG.warn("Failed to close statement ", e);
+    }
+  }
+
+  void close(ResultSet rs) {
+    try {
+      if (rs != null && !rs.isClosed()) {
+        rs.close();
+      }
+    }
+    catch(SQLException ex) {
+      LOG.warn("Failed to close statement ", ex);
+    }
+  }
+
+  static String quoteString(String input) {
+    return "'" + input + "'";
+  }
+
+  void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
+    close(rs);
+    closeStmt(stmt);
+    closeDbConn(dbConn);
+  }
+
+  private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics> 
statsPartInfoMap,
+                                       Map<PartColNameInfo, 
MPartitionColumnStatistics> updateMap,
+                                       Map<PartColNameInfo, 
MPartitionColumnStatistics>insertMap,
+                                       Connection dbConn, Table tbl) throws 
SQLException, MetaException, NoSuchObjectException {
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+    Statement statement = null;
+    ResultSet rs = null;
+    List<String> queries = new ArrayList<>();
+    Set<PartColNameInfo> selectedParts = new HashSet<>();
+
+    List<Long> partIdList = statsPartInfoMap.keySet().stream().map(
+            e -> e.partitionId).collect(Collectors.toList()
+    );
+
+    prefix.append("select \"PART_ID\", \"COLUMN_NAME\" from \"PART_COL_STATS\" 
WHERE ");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            partIdList, "\"PART_ID\"", true, false);
+
+    for (String query : queries) {
+      try {
+        statement = dbConn.createStatement();
+        LOG.debug("Going to execute query " + query);
+        rs = statement.executeQuery(query);
+        while (rs.next()) {
+          selectedParts.add(new PartColNameInfo(rs.getLong(1), 
rs.getString(2)));
+        }
+      } finally {
+        close(rs, statement, null);
+      }
+    }
+
+    for (Map.Entry entry : statsPartInfoMap.entrySet()) {
+      PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
+      ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
+      long partId = partitionInfo.partitionId;
+      ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+      if (!statsDesc.isSetCatName()) {
+        statsDesc.setCatName(tbl.getCatName());
+      }
+      for (ColumnStatisticsObj statisticsObj : colStats.getStatsObj()) {
+        PartColNameInfo temp = new PartColNameInfo(partId, 
statisticsObj.getColName());
+        if (selectedParts.contains(temp)) {
+          updateMap.put(temp, StatObjectConverter.
+                  convertToMPartitionColumnStatistics(null, statsDesc, 
statisticsObj, colStats.getEngine()));
+        } else {
+          insertMap.put(temp, StatObjectConverter.
+                  convertToMPartitionColumnStatistics(null, statsDesc, 
statisticsObj, colStats.getEngine()));
+        }
+      }
+    }
+  }
+
+  private void updatePartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> updateMap,
+                                          Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
+    PreparedStatement pst = null;
+    for (Map.Entry entry : updateMap.entrySet()) {
+      PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
+      Long partId = partColNameInfo.partitionId;
+      MPartitionColumnStatistics mPartitionColumnStatistics = 
(MPartitionColumnStatistics) entry.getValue();
+      String update = "UPDATE \"PART_COL_STATS\" SET ";
+      update += 
StatObjectConverter.getUpdatedColumnSql(mPartitionColumnStatistics);
+      update += " WHERE \"PART_ID\" = " + partId + " AND "
+              + " \"COLUMN_NAME\" = " +  
quoteString(mPartitionColumnStatistics.getColName());
+      try {
+        pst = dbConn.prepareStatement(update);
+        
StatObjectConverter.initUpdatedColumnStatement(mPartitionColumnStatistics, pst);
+        LOG.debug("Going to execute update " + update);
+        int numUpdate = pst.executeUpdate();
+        if (numUpdate != 1) {
+          throw new MetaException("Invalid state of  PART_COL_STATS for 
PART_ID " + partId);
+        }
+      } finally {
+        closeStmt(pst);
+      }
+    }
+  }
+
+  private void insertIntoPartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> insertMap,
+                                          long maxCsId,
+                                          Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
+    PreparedStatement preparedStatement = null;
+    int numRows = 0;
+    String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\", 
\"DB_NAME\","
+            + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\", 
\"COLUMN_TYPE\", \"PART_ID\","
+            + " \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\", 
\"DOUBLE_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\","
+            + " \"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", 
\"NUM_NULLS\", \"NUM_DISTINCTS\", \"BIT_VECTOR\" ,"
+            + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", 
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
+            + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
+
+    try {
+      preparedStatement = dbConn.prepareStatement(insert);
+      for (Map.Entry entry : insertMap.entrySet()) {
+        PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
+        Long partId = partColNameInfo.partitionId;
+        MPartitionColumnStatistics mPartitionColumnStatistics = 
(MPartitionColumnStatistics) entry.getValue();
+
+        preparedStatement.setLong(1, maxCsId);
+        preparedStatement.setString(2, 
mPartitionColumnStatistics.getCatName());
+        preparedStatement.setString(3, mPartitionColumnStatistics.getDbName());
+        preparedStatement.setString(4, 
mPartitionColumnStatistics.getTableName());
+        preparedStatement.setString(5, 
mPartitionColumnStatistics.getPartitionName());
+        preparedStatement.setString(6, 
mPartitionColumnStatistics.getColName());
+        preparedStatement.setString(7, 
mPartitionColumnStatistics.getColType());
+        preparedStatement.setLong(8, partId);
+        preparedStatement.setObject(9, 
mPartitionColumnStatistics.getLongLowValue());
+        preparedStatement.setObject(10, 
mPartitionColumnStatistics.getLongHighValue());
+        preparedStatement.setObject(11, 
mPartitionColumnStatistics.getDoubleHighValue());
+        preparedStatement.setObject(12, 
mPartitionColumnStatistics.getDoubleLowValue());
+        preparedStatement.setString(13, 
mPartitionColumnStatistics.getDecimalLowValue());
+        preparedStatement.setString(14, 
mPartitionColumnStatistics.getDecimalHighValue());
+        preparedStatement.setObject(15, 
mPartitionColumnStatistics.getNumNulls());
+        preparedStatement.setObject(16, 
mPartitionColumnStatistics.getNumDVs());
+        preparedStatement.setObject(17, 
mPartitionColumnStatistics.getBitVector());
+        preparedStatement.setBytes(18, 
mPartitionColumnStatistics.getHistogram());
+        preparedStatement.setObject(19, 
mPartitionColumnStatistics.getAvgColLen());
+        preparedStatement.setObject(20, 
mPartitionColumnStatistics.getMaxColLen());
+        preparedStatement.setObject(21, 
mPartitionColumnStatistics.getNumTrues());
+        preparedStatement.setObject(22, 
mPartitionColumnStatistics.getNumFalses());
+        preparedStatement.setLong(23, 
mPartitionColumnStatistics.getLastAnalyzed());
+        preparedStatement.setString(24, 
mPartitionColumnStatistics.getEngine());
+
+        maxCsId++;
+        numRows++;
+        preparedStatement.addBatch();
+        if (numRows == maxBatchSize) {
+          preparedStatement.executeBatch();
+          numRows = 0;
+        }
+      }
+
+      if (numRows != 0) {
+        preparedStatement.executeBatch();
+      }
+    } finally {
+      closeStmt(preparedStatement);
+    }
+  }
+
+  private Map<Long, String> getParamValues(Connection dbConn, List<Long> 
partIdList) throws SQLException {
+    List<String> queries = new ArrayList<>();
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+    Statement statement = null;
+    ResultSet rs = null;
+
+    prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
+            + " from \"PARTITION_PARAMS\" where "
+            + " \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE' "
+            + " and ");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            partIdList, "\"PART_ID\"", true, false);
+
+    Map<Long, String> partIdToParaMap = new HashMap<>();
+    for (String query : queries) {
+      try {
+        statement = dbConn.createStatement();
+        LOG.debug("Going to execute query " + query);
+        rs = statement.executeQuery(query);
+        while (rs.next()) {
+          partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+        }
+      } finally {
+        close(rs, statement, null);
+      }
+    }
+    return partIdToParaMap;
+  }
+
+  private void updateWriteIdForPartitions(Connection dbConn, long writeId, 
List<Long> partIdList) throws SQLException {
+    StringBuilder prefix = new StringBuilder();
+    List<String> queries = new ArrayList<>();
+    StringBuilder suffix = new StringBuilder();
+
+    prefix.append("UPDATE \"PARTITIONS\" set \"WRITE_ID\" = " + writeId + " 
where ");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+            partIdList, "\"PART_ID\"", false, false);
+
+    Statement statement = null;
+    for (String query : queries) {
+      try {
+        statement = dbConn.createStatement();
+        LOG.debug("Going to execute update " + query);
+        statement.executeUpdate(query);
+      } finally {
+        closeStmt(statement);
+      }
+    }
+  }
+
+  private Map<String, Map<String, String>> 
updatePartitionParamTable(Connection dbConn,
+                                                                     
Map<PartitionInfo, ColumnStatistics> partitionInfoMap,
+                                                                     String 
validWriteIds,
+                                                                     long 
writeId,
+                                                                     boolean 
isAcidTable)
+          throws SQLException, MetaException {
+    Map<String, Map<String, String>> result = new HashMap<>();
+    boolean areTxnStatsSupported = MetastoreConf.getBoolVar(conf, 
ConfVars.HIVE_TXN_STATS_ENABLED);
+    String insert = "INSERT INTO \"PARTITION_PARAMS\" (\"PART_ID\", 
\"PARAM_KEY\", \"PARAM_VALUE\") "
+            + "VALUES( ? , 'COLUMN_STATS_ACCURATE'  , ? )";
+    String delete = "DELETE from \"PARTITION_PARAMS\" "
+            + " where \"PART_ID\" = ? "
+            + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
+    String update = "UPDATE \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? "
+            + " where \"PART_ID\" = ? "
+            + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
+    int numInsert = 0;
+    int numDelete = 0;
+    int numUpdate = 0;
+
+    List<Long> partIdList = partitionInfoMap.keySet().stream().map(
+            e -> e.partitionId).collect(Collectors.toList()
+    );
+
+    // get the old parameters from PARTITION_PARAMS table.
+    Map<Long, String> partIdToParaMap = getParamValues(dbConn, partIdList);
+
+    try (PreparedStatement statementInsert = dbConn.prepareStatement(insert);
+         PreparedStatement statementDelete = dbConn.prepareStatement(delete);
+         PreparedStatement statementUpdate = dbConn.prepareStatement(update)) {
+      for (Map.Entry entry : partitionInfoMap.entrySet()) {
+        PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
+        ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
+        List<String> colNames = colStats.getStatsObj().stream().map(e -> 
e.getColName()).collect(Collectors.toList());
+        long partWriteId = partitionInfo.writeId;
+        long partId = partitionInfo.partitionId;
+        Map<String, String> newParameter;
+
+        if (!partIdToParaMap.containsKey(partId)) {
+          newParameter = new HashMap<>();
+          newParameter.put(COLUMN_STATS_ACCURATE, "TRUE");
+          StatsSetupConst.setColumnStatsState(newParameter, colNames);
+          statementInsert.setLong(1, partId);
+          statementInsert.setString(2, 
newParameter.get(COLUMN_STATS_ACCURATE));
+          numInsert++;
+          statementInsert.addBatch();
+          if (numInsert == maxBatchSize) {
+            LOG.debug(" Executing insert " + insert);
+            statementInsert.executeBatch();
+            numInsert = 0;
+          }
+        } else {
+          String oldStats = partIdToParaMap.get(partId);
+
+          Map<String, String> oldParameter = new HashMap<>();
+          oldParameter.put(COLUMN_STATS_ACCURATE, oldStats);
+
+          newParameter = new HashMap<>();
+          newParameter.put(COLUMN_STATS_ACCURATE, oldStats);
+          StatsSetupConst.setColumnStatsState(newParameter, colNames);
+
+          if (isAcidTable) {
+            String errorMsg = ObjectStore.verifyStatsChangeCtx(
+                    colStats.getStatsDesc().getDbName() + "." + 
colStats.getStatsDesc().getTableName(),
+                    oldParameter, newParameter, writeId, validWriteIds, true);
+            if (errorMsg != null) {
+              throw new MetaException(errorMsg);
+            }
+          }
+
+          if (isAcidTable &&
+                  (!areTxnStatsSupported || 
!ObjectStore.isCurrentStatsValidForTheQuery(oldParameter, partWriteId,
+                          validWriteIds, true))) {
+            statementDelete.setLong(1, partId);
+            statementDelete.addBatch();
+            numDelete++;
+            if (numDelete == maxBatchSize) {
+              statementDelete.executeBatch();
+              numDelete = 0;
+              LOG.debug("Removed COLUMN_STATS_ACCURATE from the parameters of 
the partition "
+                      + colStats.getStatsDesc().getDbName() + "." + 
colStats.getStatsDesc().getTableName() + "."
+                      + colStats.getStatsDesc().getPartName());
+            }
+          } else {
+            statementUpdate.setString(1, 
newParameter.get(COLUMN_STATS_ACCURATE));
+            statementUpdate.setLong(2, partId);
+            statementUpdate.addBatch();
+            numUpdate++;
+            if (numUpdate == maxBatchSize) {
+              LOG.debug(" Executing update " + statementUpdate);
+              statementUpdate.executeBatch();
+              numUpdate = 0;
+            }
+          }
+        }
+        result.put(partitionInfo.partitionName, newParameter);
+      }
+
+      if (numInsert != 0) {
+        statementInsert.executeBatch();
+      }
+
+      if (numUpdate != 0) {
+        statementUpdate.executeBatch();
+      }
+
+      if (numDelete != 0) {
+        statementDelete.executeBatch();
+      }
+
+      if (isAcidTable) {
+        updateWriteIdForPartitions(dbConn, writeId, partIdList);
+      }
+      return result;
+    }
+  }
+
+
+  private Map<PartitionInfo, ColumnStatistics> getPartitionInfo(Connection 
dbConn, long tblId,
+                                                                 Map<String, 
ColumnStatistics> partColStatsMap)
+          throws SQLException, MetaException {
+    List<String> queries = new ArrayList<>();
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+    Statement statement = null;
+    ResultSet rs = null;
+    Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
+
+    List<String> partKeys = partColStatsMap.keySet().stream().map(
+            e -> quoteString(e)).collect(Collectors.toList()
+    );
+
+    prefix.append("select \"PART_ID\", \"WRITE_ID\", \"PART_NAME\"  from 
\"PARTITIONS\" where ");
+    suffix.append(" and  \"TBL_ID\" = " + tblId);
+    TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+            partKeys, "\"PART_NAME\"", true, false);
+
+    for (String query : queries) {
+      // Select for update makes sure that the partitions are not modified 
while the stats are getting updated.
+      query = sqlGenerator.addForUpdateClause(query);
+      try {
+        statement = dbConn.createStatement();
+        LOG.debug("Going to execute query <" + query + ">");
+        rs = statement.executeQuery(query);
+        while (rs.next()) {
+          PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
+                  rs.getLong(2), rs.getString(3));
+          partitionInfoMap.put(partitionInfo, 
partColStatsMap.get(rs.getString(3)));
+        }
+      } finally {
+        close(rs, statement, null);
+      }
+    }
+    return partitionInfoMap;
+  }
+
+  private void setAnsiQuotes(Connection dbConn) throws SQLException {
+    if (sqlGenerator.getDbProduct().isMYSQL()) {
+      try (Statement stmt = dbConn.createStatement()) {
+        stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
+      }
+    }
+  }
+
+  /**
+   * Update the statistics for the given partitions. Add the notification logs 
also.
+   * @return map of partition key to column stats if successful, null 
otherwise.
+   */
+  public Map<String, Map<String, String>> 
updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap,
+                                                      Table tbl, long csId,
+                                                      String validWriteIds, 
long writeId,
+                                                      
List<TransactionalMetaStoreEventListener> transactionalListeners)
+          throws MetaException {
+    JDOConnection jdoConn = null;
+    Connection dbConn = null;
+    boolean committed = false;
+    try {
+      lockInternal();
+      jdoConn = pm.getDataStoreConnection();
+      dbConn = (Connection) (jdoConn.getNativeConnection());
+
+      setAnsiQuotes(dbConn);
+
+      Map<PartitionInfo, ColumnStatistics> partitionInfoMap = 
getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);
+
+      Map<String, Map<String, String>> result =
+              updatePartitionParamTable(dbConn, partitionInfoMap, 
validWriteIds, writeId, TxnUtils.isAcidTable(tbl));
+
+      Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new 
HashMap<>();
+      Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new 
HashMap<>();
+      populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn, 
tbl);
+
+      LOG.info("Number of stats to insert  " + insertMap.size() + " update " + 
updateMap.size());
+
+      if (insertMap.size() != 0) {
+        insertIntoPartColStatTable(insertMap, csId, dbConn);
+      }
+
+      if (updateMap.size() != 0) {
+        updatePartColStatTable(updateMap, dbConn);
+      }
+
+      if (transactionalListeners != null) {
+        UpdatePartitionColumnStatEventBatch eventBatch = new 
UpdatePartitionColumnStatEventBatch(null);
+        for (Map.Entry entry : result.entrySet()) {
+          Map<String, String> parameters = (Map<String, String>) 
entry.getValue();
+          ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
+          List<String> partVals = getPartValsFromName(tbl, 
colStats.getStatsDesc().getPartName());
+          UpdatePartitionColumnStatEvent event = new 
UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
+                  tbl, writeId, null);
+          eventBatch.addPartColStatEvent(event);
+        }
+        
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH, 
eventBatch, dbConn, sqlGenerator);
+      }
+      dbConn.commit();
+      committed = true;
+      return result;
+    } catch (Exception e) {
+      LOG.error("Unable to update Column stats for  " + tbl.getTableName(), e);
+      throw new MetaException("Unable to update Column stats for  " + 
tbl.getTableName()
+              + " due to: "  + e.getMessage());
+    } finally {
+      if (!committed) {
+        rollbackDBConn(dbConn);
+      }
+      closeDbConn(jdoConn);
+      unlockInternal();
+    }
+  }
+
+  /**
+   * Gets the next CS id from sequence MPartitionColumnStatistics and 
increment the CS id by numStats.
+   * @return The CD id before update.
+   */
+  public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws 
MetaException {
+    Statement statement = null;
+    ResultSet rs = null;
+    long maxCsId = 0;
+    boolean committed = false;
+    Connection dbConn = null;
+    JDOConnection jdoConn = null;
+
+    try {
+      lockInternal();
+      jdoConn = pm.getDataStoreConnection();
+      dbConn = (Connection) (jdoConn.getNativeConnection());
+
+      setAnsiQuotes(dbConn);
+
+      // This loop will be iterated at max twice. If there is no records, it 
will first insert and then do a select.
+      // We are not using any upsert operations as select for update and then 
update is required to make sure that
+      // the caller gets a reserved range for CSId not used by any other 
thread.
+      boolean insertDone = false;
+      while (maxCsId == 0) {
+        String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\" 
FROM \"SEQUENCE_TABLE\" "
+                + "WHERE \"SEQUENCE_NAME\"= "
+                + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
+        LOG.debug("Going to execute query " + query);
+        statement = dbConn.createStatement();
+        rs = statement.executeQuery(query);
+        if (rs.next()) {
+          maxCsId = rs.getLong(1);
+        } else if (insertDone) {
+          throw new MetaException("Invalid state of SEQUENCE_TABLE for 
MPartitionColumnStatistics");
+        } else {
+          insertDone = true;
+          closeStmt(statement);
+          statement = dbConn.createStatement();
+          query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", 
\"NEXT_VAL\")  VALUES ( "
+                  + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
 + "," + 1
+                  + ")";
+          try {
+            statement.executeUpdate(query);
+          } catch (SQLException e) {
+            // If the record is already inserted by some other thread continue 
to select.
+            if (dbType.isDuplicateKeyError(e)) {
+              continue;
+            }
+            LOG.error("Unable to insert into SEQUENCE_TABLE for 
MPartitionColumnStatistics.", e);
+            throw e;
+          } finally {
+            closeStmt(statement);
+          }
+        }
+      }
+
+      long nextMaxCsId = maxCsId + numStats + 1;
+      closeStmt(statement);
+      statement = dbConn.createStatement();
+      String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
+              + nextMaxCsId
+              + " WHERE \"SEQUENCE_NAME\" = "
+              + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
+      statement.executeUpdate(query);
+      dbConn.commit();
+      committed = true;
+      return maxCsId;
+    } catch (Exception e) {
+      LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
+      throw new MetaException("Unable to 
getNextCSIdForMPartitionColumnStatistics  "
+              + " due to: " + e.getMessage());
+    } finally {
+      if (!committed) {
+        rollbackDBConn(dbConn);
+      }
+      close(rs, statement, jdoConn);
+      unlockInternal();
+    }
+  }
+
+  public void alterPartitions(Map<List<String>, Long> partValuesToId, 
Map<Long, Long> partIdToSdId,
+                              List<Partition> newParts) throws MetaException {
+    List<Long> partIds = new ArrayList<>(newParts.size());
+    Map<Long, Optional<Map<String, String>>> partParamsOpt = new HashMap<>();
+    Map<Long, StorageDescriptor> idToSd = new HashMap<>();
+    for (Partition newPart : newParts) {
+      Long partId = partValuesToId.get(newPart.getValues());
+      Long sdId = partIdToSdId.get(partId);
+      partIds.add(partId);
+      partParamsOpt.put(partId, Optional.ofNullable(newPart.getParameters()));
+      idToSd.put(sdId, newPart.getSd());
+    }
+
+    // alter partitions does not change partition values,
+    // so only PARTITIONS and PARTITION_PARAMS need to update.
+    updatePartitionsInBatch(partValuesToId, newParts);
+    updateParamTableInBatch("\"PARTITION_PARAMS\"", "\"PART_ID\"", partIds, 
partParamsOpt);
+    updateStorageDescriptorInBatch(idToSd);
+  }
+
+  private interface ThrowableConsumer<T> {
+    void accept(T t) throws SQLException, MetaException;
+  }
+
+  private <T> List<Long> filterIdsByNonNullValue(List<Long> ids, Map<Long, T> 
map) {
+    return ids.stream().filter(id -> map.get(id) != 
null).collect(Collectors.toList());
+  }
+
+  private void updateWithStatement(ThrowableConsumer<PreparedStatement> 
consumer, String query)
+      throws MetaException {
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    boolean doTrace = LOG.isDebugEnabled();
+    long start = doTrace ? System.nanoTime() : 0;
+    try (PreparedStatement statement =
+             ((Connection) 
jdoConn.getNativeConnection()).prepareStatement(query)) {
+      consumer.accept(statement);
+      MetastoreDirectSqlUtils.timingTrace(doTrace, query, start, doTrace ? 
System.nanoTime() : 0);
+    } catch (SQLException e) {
+      LOG.error("Failed to execute update query: " + query, e);
+      throw new MetaException("Unable to execute update due to: " + 
e.getMessage());
+    } finally {
+      closeDbConn(jdoConn);
+    }
+  }
+
+  private void updatePartitionsInBatch(Map<List<String>, Long> partValuesToId,
+                                       List<Partition> newParts) throws 
MetaException {
+    List<String> columns = Arrays.asList("\"CREATE_TIME\"", 
"\"LAST_ACCESS_TIME\"", "\"WRITE_ID\"");
+    List<String> conditionKeys = Arrays.asList("\"PART_ID\"");
+    String stmt = TxnUtils.createUpdatePreparedStmt("\"PARTITIONS\"", columns, 
conditionKeys);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, newParts, 
new Batchable<Partition, Void>() {
+      @Override
+      public List<Void> run(List<Partition> input) throws SQLException {
+        for (Partition p : input) {
+          statement.setLong(1, p.getCreateTime());
+          statement.setLong(2, p.getLastAccessTime());
+          statement.setLong(3, p.getWriteId());
+          statement.setLong(4, partValuesToId.get(p.getValues()));
+          statement.addBatch();
+        }
+        statement.executeBatch();
+        return null;
+      }
+    }), stmt);
+  }
+
+  /* Get stringListId from both SKEWED_VALUES and SKEWED_COL_VALUE_LOC_MAP 
tables. */
+  private List<Long> getStringListId(List<Long> sdIds) throws MetaException {
+    return Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, 
Long>() {
+      @Override
+      public List<Long> run(List<Long> input) throws Exception {
+        List<Long> result = new ArrayList<>();
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryFromSkewedValues = "select \"STRING_LIST_ID_EID\" " +
+            "from \"SKEWED_VALUES\" where \"SD_ID_OID\" in (" + idLists + ")";
+        try (QueryWrapper query =
+                 new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", 
queryFromSkewedValues))) {
+          List<Long> sqlResult = executeWithArray(query.getInnerQuery(), null, 
queryFromSkewedValues);
+          result.addAll(sqlResult);
+        }
+        String queryFromValueLoc = "select \"STRING_LIST_ID_KID\" " +
+            "from \"SKEWED_COL_VALUE_LOC_MAP\" where \"SD_ID\" in (" + idLists 
+ ")";
+        try (QueryWrapper query =
+                 new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", 
queryFromValueLoc))) {
+          List<Long> sqlResult = executeWithArray(query.getInnerQuery(), null, 
queryFromValueLoc);
+          result.addAll(sqlResult);
+        }
+        return result;
+      }
+    });
+  }
+
+  private void updateParamTableInBatch(String paramTable, String idColumn, 
List<Long> ids,
+                                       Map<Long, Optional<Map<String, 
String>>> newParamsOpt) throws MetaException {
+    Map<Long, Map<String, String>> oldParams = getParams(paramTable, idColumn, 
ids);
+
+    List<Pair<Long, String>> paramsToDelete = new ArrayList<>();
+    List<Pair<Long, Pair<String, String>>> paramsToUpdate = new ArrayList<>();
+    List<Pair<Long, Pair<String, String>>> paramsToAdd = new ArrayList<>();
+
+    for (Long id : ids) {
+      Map<String, String> oldParam = oldParams.getOrDefault(id, new 
HashMap<>());
+      Map<String, String> newParam = 
newParamsOpt.get(id).orElseGet(HashMap::new);
+      for (Map.Entry<String, String> entry : oldParam.entrySet()) {
+        String key = entry.getKey();
+        String oldValue = entry.getValue();
+        if (!newParam.containsKey(key)) {
+          paramsToDelete.add(Pair.of(id, key));
+        } else if (!oldValue.equals(newParam.get(key))) {
+          paramsToUpdate.add(Pair.of(id, Pair.of(key, newParam.get(key))));
+        }
+      }
+      List<Pair<Long, Pair<String, String>>> newlyParams = 
newParam.entrySet().stream()
+          .filter(entry -> !oldParam.containsKey(entry.getKey()))
+          .map(entry -> Pair.of(id, Pair.of(entry.getKey(), entry.getValue())))
+          .collect(Collectors.toList());
+      paramsToAdd.addAll(newlyParams);
+    }
+
+    deleteParams(paramTable, idColumn, paramsToDelete);
+    updateParams(paramTable, idColumn, paramsToUpdate);
+    insertParams(paramTable, idColumn, paramsToAdd);
+  }
+
+  private Map<Long, Map<String, String>> getParams(String paramTable, String 
idName,
+                                                   List<Long> ids) throws 
MetaException {
+    Map<Long, Map<String, String>> idToParams = new HashMap<>();
+    Batchable.runBatched(maxBatchSize, ids, new Batchable<Long, Object>() {
+      @Override
+      public List<Object> run(List<Long> input) throws MetaException {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryText = "select " + idName + ", \"PARAM_KEY\", 
\"PARAM_VALUE\" from " +
+            paramTable + " where " + idName +  " in (" + idLists + ")";
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+          List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(), 
null, queryText);
+          for (Object[] row : sqlResult) {
+            Long id = extractSqlLong(row[0]);
+            String paramKey = (String) row[1];
+            String paramVal = (String) row[2];
+            idToParams.computeIfAbsent(id, key -> new 
HashMap<>()).put(paramKey, paramVal);
+          }
+        }
+        return null;
+      }
+    });
+    return idToParams;
+  }
+
+  private void deleteParams(String paramTable, String idColumn,
+                            List<Pair<Long, String>> deleteIdKeys) throws 
MetaException {
+    String deleteStmt = "delete from " + paramTable + " where " + idColumn +  
"=? and PARAM_KEY=?";
+    int maxRows = dbType.getMaxRows(maxBatchSize, 2);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
deleteIdKeys,
+        new Batchable<Pair<Long, String>, Void>() {
+          @Override
+          public List<Void> run(List<Pair<Long, String>> input) throws 
SQLException {
+            for (Pair<Long, String> pair : input) {
+              statement.setLong(1, pair.getLeft());
+              statement.setString(2, pair.getRight());
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), deleteStmt);
+  }
+
+  private void updateParams(String paramTable, String idColumn,
+                            List<Pair<Long, Pair<String, String>>> 
updateIdAndParams) throws MetaException {
+    List<String> columns = Arrays.asList("\"PARAM_VALUE\"");
+    List<String> conditionKeys = Arrays.asList(idColumn, "\"PARAM_KEY\"");
+    String stmt = TxnUtils.createUpdatePreparedStmt(paramTable, columns, 
conditionKeys);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
updateIdAndParams,
+        new Batchable<Pair<Long, Pair<String, String>>, Object>() {
+          @Override
+          public List<Object> run(List<Pair<Long, Pair<String, String>>> 
input) throws SQLException {
+            for (Pair<Long, Pair<String, String>> pair : input) {
+              statement.setString(1, pair.getRight().getRight());
+              statement.setLong(2, pair.getLeft());
+              statement.setString(3, pair.getRight().getLeft());
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), stmt);
+  }
+
+  private void insertParams(String paramTable, String idColumn,
+                            List<Pair<Long, Pair<String, String>>> 
addIdAndParams) throws MetaException {
+    List<String> columns = Arrays.asList(idColumn, "\"PARAM_KEY\"", 
"\"PARAM_VALUE\"");
+    String query = TxnUtils.createInsertPreparedStmt(paramTable, columns);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
addIdAndParams,
+        new Batchable<Pair<Long, Pair<String, String>>, Void>() {
+          @Override
+          public List<Void> run(List<Pair<Long, Pair<String, String>>> input) 
throws SQLException {
+            for (Pair<Long, Pair<String, String>> pair : input) {
+              statement.setLong(1, pair.getLeft());
+              statement.setString(2, pair.getRight().getLeft());
+              statement.setString(3, pair.getRight().getRight());
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), query);
+  }
+
+  private void updateStorageDescriptorInBatch(Map<Long, StorageDescriptor> 
idToSd)
+      throws MetaException {
+    Map<Long, Long> sdIdToCdId = new HashMap<>();
+    Map<Long, Long> sdIdToSerdeId = new HashMap<>();
+    List<Long> cdIds = new ArrayList<>();
+    List<Long> validSdIds = filterIdsByNonNullValue(new 
ArrayList<>(idToSd.keySet()), idToSd);
+    Batchable.runBatched(maxBatchSize, validSdIds, new Batchable<Long, Void>() 
{
+      @Override
+      public List<Void> run(List<Long> input) throws Exception {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryText = "select \"SD_ID\", \"CD_ID\", \"SERDE_ID\" from 
\"SDS\" " +
+            "where \"SD_ID\" in (" + idLists + ")";
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+          List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(), 
null, queryText);
+          for (Object[] row : sqlResult) {
+            Long sdId = extractSqlLong(row[0]);
+            Long cdId = extractSqlLong(row[1]);
+            Long serdeId = extractSqlLong(row[2]);
+            sdIdToCdId.put(sdId, cdId);
+            sdIdToSerdeId.put(sdId, serdeId);
+            cdIds.add(cdId);
+          }
+        }
+        return null;
+      }
+    });
+
+    Map<Long, Optional<Map<String, String>>> sdParamsOpt = new HashMap<>();
+    Map<Long, List<String>> idToBucketCols = new HashMap<>();
+    Map<Long, List<Order>> idToSortCols = new HashMap<>();
+    Map<Long, SkewedInfo> idToSkewedInfo = new HashMap<>();
+    Map<Long, List<FieldSchema>> sdIdToNewColumns = new HashMap<>();
+    List<Long> serdeIds = new ArrayList<>();
+    Map<Long, SerDeInfo> serdeIdToSerde = new HashMap<>();
+    Map<Long, Optional<Map<String, String>>> serdeParamsOpt = new HashMap<>();
+    for (Long sdId : validSdIds) {
+      StorageDescriptor sd = idToSd.get(sdId);
+      sdParamsOpt.put(sdId, Optional.ofNullable(sd.getParameters()));
+      idToBucketCols.put(sdId, sd.getBucketCols());
+      idToSortCols.put(sdId, sd.getSortCols());
+      idToSkewedInfo.put(sdId, sd.getSkewedInfo());
+      sdIdToNewColumns.put(sdId, sd.getCols());
+
+      Long serdeId = sdIdToSerdeId.get(sdId);
+      serdeIds.add(serdeId);
+      serdeIdToSerde.put(serdeId, sd.getSerdeInfo());
+      serdeParamsOpt.put(serdeId, 
Optional.ofNullable(sd.getSerdeInfo().getParameters()));
+    }
+
+    updateParamTableInBatch("\"SD_PARAMS\"", "\"SD_ID\"", validSdIds, 
sdParamsOpt);
+    updateBucketColsInBatch(idToBucketCols, validSdIds);
+    updateSortColsInBatch(idToSortCols, validSdIds);
+    updateSkewedInfoInBatch(idToSkewedInfo, validSdIds);
+    Map<Long, Long> sdIdToNewCdId = updateCDInBatch(cdIds, validSdIds, 
sdIdToCdId, sdIdToNewColumns);
+    updateSerdeInBatch(serdeIds, serdeIdToSerde);
+    updateParamTableInBatch("\"SERDE_PARAMS\"", "\"SERDE_ID\"", serdeIds, 
serdeParamsOpt);
+
+    List<Long> cdIdsMayDelete = sdIdToCdId.entrySet().stream()
+        .filter(entry -> sdIdToNewCdId.containsKey(entry.getKey()))
+        .map(entry -> entry.getValue())
+        .collect(Collectors.toList());
+
+    // Update SDS table after CDS to get the freshest CD_ID values.
+    sdIdToCdId.replaceAll((sdId, cdId) ->
+        sdIdToNewCdId.containsKey(sdId) ? sdIdToNewCdId.get(sdId) : cdId);
+    updateSDInBatch(validSdIds, idToSd, sdIdToCdId);
+
+    List<Long> usedIds = Batchable.runBatched(maxBatchSize, cdIdsMayDelete,
+        new Batchable<Long, Long>() {
+          @Override
+          public List<Long> run(List<Long> input) throws Exception {
+            String idLists = MetaStoreDirectSql.getIdListForIn(input);
+            String queryText = "select \"CD_ID\" from \"SDS\" where \"CD_ID\" 
in ( " + idLists + ")";
+            try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+              List<Long> sqlResult = executeWithArray(query.getInnerQuery(), 
null, queryText);
+              return new ArrayList<>(sqlResult);
+            }
+          }
+    });
+    List<Long> unusedCdIds = cdIdsMayDelete.stream().filter(id -> 
!usedIds.contains(id)).collect(Collectors.toList());
+
+    deleteCDInBatch(unusedCdIds);
+  }
+
+  private void updateSDInBatch(List<Long> ids, Map<Long, StorageDescriptor> 
idToSd,
+                               Map<Long, Long> idToCdId) throws MetaException {
+    List<String> columns = Arrays.asList("\"CD_ID\"", "\"INPUT_FORMAT\"", 
"\"IS_COMPRESSED\"",
+        "\"IS_STOREDASSUBDIRECTORIES\"", "\"LOCATION\"", "\"NUM_BUCKETS\"", 
"\"OUTPUT_FORMAT\"");
+    List<String> conditionKeys = Arrays.asList("\"SD_ID\"");
+    String stmt = TxnUtils.createUpdatePreparedStmt("\"SDS\"", columns, 
conditionKeys);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 8);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws SQLException {
+            for (Long sdId : input) {
+              StorageDescriptor sd = idToSd.get(sdId);
+              statement.setLong(1, idToCdId.get(sdId));
+              statement.setString(2, sd.getInputFormat());
+              statement.setBoolean(3, sd.isCompressed());
+              statement.setBoolean(4, sd.isStoredAsSubDirectories());
+              statement.setString(5, sd.getLocation());
+              statement.setInt(6, sd.getNumBuckets());
+              statement.setString(7, sd.getOutputFormat());
+              statement.setLong(8, sdId);
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), stmt);
+  }
+
+  private void updateBucketColsInBatch(Map<Long, List<String>> 
sdIdToBucketCols,
+                                       List<Long> sdIds) throws MetaException {
+    Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+      @Override
+      public List<Void> run(List<Long> input) throws MetaException {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryText = "delete from \"BUCKETING_COLS\" where \"SD_ID\" in 
(" + idLists + ")";
+        updateWithStatement(PreparedStatement::executeUpdate, queryText);
+        return null;
+      }
+    });
+    List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", 
"\"BUCKET_COL_NAME\"");
+    String stmt = TxnUtils.createInsertPreparedStmt("\"BUCKETING_COLS\"", 
columns);
+    List<Long> idWithBucketCols = filterIdsByNonNullValue(sdIds, 
sdIdToBucketCols);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithBucketCols, new Batchable<Long, Object>() {
+      @Override
+      public List<Object> run(List<Long> input) throws SQLException {
+        for (Long id : input) {
+          List<String> bucketCols = sdIdToBucketCols.get(id);
+          for (int i = 0; i < bucketCols.size(); i++) {
+            statement.setLong(1, id);
+            statement.setInt(2, i);
+            statement.setString(3, bucketCols.get(i));
+            statement.addBatch();
+          }
+        }
+        statement.executeBatch();
+        return null;
+      }
+    }), stmt);
+  }
+
+  private void updateSortColsInBatch(Map<Long, List<Order>> sdIdToSortCols,
+                                     List<Long> sdIds) throws MetaException {
+    Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+      @Override
+      public List<Void> run(List<Long> input) throws MetaException {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryText = "delete from \"SORT_COLS\" where \"SD_ID\" in (" + 
idLists + ")";
+        updateWithStatement(PreparedStatement::executeUpdate, queryText);
+        return null;
+      }
+    });
+
+    List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", 
"\"COLUMN_NAME\"", "\"ORDER\"");
+    String stmt = TxnUtils.createInsertPreparedStmt("\"SORT_COLS\"", columns);
+    List<Long> idWithSortCols = filterIdsByNonNullValue(sdIds, sdIdToSortCols);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithSortCols, new Batchable<Long, Object>() {
+      @Override
+      public List<Object> run(List<Long> input) throws SQLException {
+        for (Long id : input) {
+          List<Order> bucketCols = sdIdToSortCols.get(id);
+          for (int i = 0; i < bucketCols.size(); i++) {
+            statement.setLong(1, id);
+            statement.setInt(2, i);
+            statement.setString(3, bucketCols.get(i).getCol());
+            statement.setInt(4, bucketCols.get(i).getOrder());
+            statement.addBatch();
+          }
+        }
+        statement.executeBatch();
+        return null;
+      }
+    }), stmt);
+  }
+
+  private void updateSkewedInfoInBatch(Map<Long, SkewedInfo> sdIdToSkewedInfo,
+                                       List<Long> sdIds) throws MetaException {
+    // Delete all mapping old stringLists and skewedValues,
+    // skewedValues first for the foreign key constraint.
+    List<Long> stringListId = getStringListId(sdIds);
+    if (!stringListId.isEmpty()) {
+      Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+        @Override
+        public List<Void> run(List<Long> input) throws Exception {
+          String idLists = MetaStoreDirectSql.getIdListForIn(input);
+          String deleteSkewValuesQuery =
+              "delete from \"SKEWED_VALUES\" where \"SD_ID_OID\" in (" + 
idLists + ")";
+          updateWithStatement(PreparedStatement::executeUpdate, 
deleteSkewValuesQuery);
+          String deleteSkewColValueLocMapQuery =
+              "delete from \"SKEWED_COL_VALUE_LOC_MAP\" where \"SD_ID\" in (" 
+ idLists + ")";
+          updateWithStatement(PreparedStatement::executeUpdate, 
deleteSkewColValueLocMapQuery);
+          String deleteSkewColNamesQuery =
+              "delete from \"SKEWED_COL_NAMES\" where \"SD_ID\" in (" + 
idLists + ")";
+          updateWithStatement(PreparedStatement::executeUpdate, 
deleteSkewColNamesQuery);
+          return null;
+        }
+      });
+      Batchable.runBatched(maxBatchSize, stringListId, new Batchable<Long, 
Void>() {
+        @Override
+        public List<Void> run(List<Long> input) throws MetaException {
+          String idLists = MetaStoreDirectSql.getIdListForIn(input);
+          String deleteStringListValuesQuery =
+              "delete from \"SKEWED_STRING_LIST_VALUES\" where 
\"STRING_LIST_ID\" in (" + idLists + ")";
+          updateWithStatement(PreparedStatement::executeUpdate, 
deleteStringListValuesQuery);
+          String deleteStringListQuery =
+              "delete from \"SKEWED_STRING_LIST\" where \"STRING_LIST_ID\" in 
(" + idLists + ")";
+          updateWithStatement(PreparedStatement::executeUpdate, 
deleteStringListQuery);
+          return null;
+        }
+      });
+    }
+
+    // Generate new stringListId for each SdId
+    Map<Long, List<String>> idToSkewedColNames = new HashMap<>();         // 
used for SKEWED_COL_NAMES
+    List<Long> newStringListId = new ArrayList<>();                       // 
used for SKEWED_STRING_LIST
+    Map<Long, List<String>> stringListIdToValues = new HashMap<>();       // 
used for SKEWED_STRING_LIST_VALUES
+    Map<Long, List<Long>> sdIdToNewStringListId = new HashMap<>();        // 
used for SKEWED_VALUES
+    Map<Long, List<Pair<Long, String>>> sdIdToValueLoc = new HashMap<>(); // 
used for SKEWED_COL_VALUE_LOC_MAP
+
+    List<Long> idWithSkewedInfo = filterIdsByNonNullValue(sdIds, 
sdIdToSkewedInfo);
+    for (Long sdId : idWithSkewedInfo) {
+      SkewedInfo skewedInfo = sdIdToSkewedInfo.get(sdId);
+      idToSkewedColNames.put(sdId, skewedInfo.getSkewedColNames());
+      List<List<String>> skewedColValues = skewedInfo.getSkewedColValues();
+      if (skewedColValues != null) {
+        for (List<String> colValues : skewedColValues) {
+          Long nextStringListId = getDataStoreId(MStringList.class);
+          newStringListId.add(nextStringListId);
+          sdIdToNewStringListId.computeIfAbsent(sdId, k -> new 
ArrayList<>()).add(nextStringListId);
+          stringListIdToValues.put(nextStringListId, colValues);
+        }
+      }
+      Map<List<String>, String> skewedColValueLocationMaps = 
skewedInfo.getSkewedColValueLocationMaps();
+      if (skewedColValueLocationMaps != null) {
+        for (Map.Entry<List<String>, String> entry : 
skewedColValueLocationMaps.entrySet()) {
+          List<String> colValues = entry.getKey();
+          String location = entry.getValue();
+          Long nextStringListId = getDataStoreId(MStringList.class);
+          newStringListId.add(nextStringListId);
+          stringListIdToValues.put(nextStringListId, colValues);
+          sdIdToValueLoc.computeIfAbsent(sdId, k -> new 
ArrayList<>()).add(Pair.of(nextStringListId, location));
+        }
+      }
+    }
+
+    insertSkewedColNamesInBatch(idToSkewedColNames, sdIds);
+    insertStringListInBatch(newStringListId);
+    insertStringListValuesInBatch(stringListIdToValues, newStringListId);
+    insertSkewedValuesInBatch(sdIdToNewStringListId, sdIds);
+    insertSkewColValueLocInBatch(sdIdToValueLoc, sdIds);
+  }
+
+  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
+    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, -1);
+    } else {
+      throw new MetaException("Identity type is not datastore.");
+    }
+  }
+
+  private void insertSkewedColNamesInBatch(Map<Long, List<String>> 
sdIdToSkewedColNames,
+                                           List<Long> sdIds) throws 
MetaException {
+    List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", 
"\"SKEWED_COL_NAME\"");
+    String stmt = TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_NAMES\"", 
columns);
+    List<Long> idWithSkewedCols = filterIdsByNonNullValue(sdIds, 
sdIdToSkewedColNames);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithSkewedCols, new Batchable<Long, Object>() {
+      @Override
+      public List<Object> run(List<Long> input) throws SQLException {
+        for (Long id : input) {
+          List<String> skewedColNames = sdIdToSkewedColNames.get(id);
+          for (int i = 0; i < skewedColNames.size(); i++) {
+            statement.setLong(1, id);
+            statement.setInt(2, i);
+            statement.setString(3, skewedColNames.get(i));
+            statement.addBatch();
+          }
+        }
+        statement.executeBatch();
+        return null;
+      }
+    }), stmt);
+  }
+
+  private void insertStringListInBatch(List<Long> stringListIds) throws 
MetaException {
+    List<String> columns = Arrays.asList("\"STRING_LIST_ID\"");
+    String insertQuery = 
TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST\"", columns);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 1);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
stringListIds,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws SQLException {
+            for (Long id : input) {
+              statement.setLong(1, id);
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), insertQuery);
+  }
+
+  private void insertStringListValuesInBatch(Map<Long, List<String>> 
stringListIdToValues,
+                                             List<Long> stringListIds) throws 
MetaException {
+    List<String> columns = Arrays.asList("\"STRING_LIST_ID\"", 
"\"INTEGER_IDX\"", "\"STRING_LIST_VALUE\"");
+    String insertQuery = 
TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST_VALUES\"", columns);
+    List<Long> idWithStringList = filterIdsByNonNullValue(stringListIds, 
stringListIdToValues);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithStringList,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws SQLException {
+            for (Long stringListId : input) {
+              List<String> values = stringListIdToValues.get(stringListId);
+              for (int i = 0; i < values.size(); i++) {
+                statement.setLong(1, stringListId);
+                statement.setInt(2, i);
+                statement.setString(3, values.get(i));
+                statement.addBatch();
+              }
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), insertQuery);
+  }
+
+  private void insertSkewedValuesInBatch(Map<Long, List<Long>> 
sdIdToStringListId,
+                                        List<Long> sdIds) throws MetaException 
{
+    List<String> columns = Arrays.asList("\"SD_ID_OID\"", "\"INTEGER_IDX\"", 
"\"STRING_LIST_ID_EID\"");
+    String insertQuery = 
TxnUtils.createInsertPreparedStmt("\"SKEWED_VALUES\"", columns);
+    List<Long> idWithSkewedValues = filterIdsByNonNullValue(sdIds, 
sdIdToStringListId);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithSkewedValues,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws Exception {
+            for (Long sdId : input) {
+              List<Long> stringListIds = sdIdToStringListId.get(sdId);
+              for (int i = 0; i < stringListIds.size(); i++) {
+                statement.setLong(1, sdId);
+                statement.setInt(2, i);
+                statement.setLong(3, stringListIds.get(i));
+                statement.addBatch();
+              }
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), insertQuery);
+  }
+
+  private void insertSkewColValueLocInBatch(Map<Long, List<Pair<Long, 
String>>> sdIdToColValueLoc,
+                                            List<Long> sdIds) throws 
MetaException {
+    List<String> columns = Arrays.asList("\"SD_ID\"", 
"\"STRING_LIST_ID_KID\"", "\"LOCATION\"");
+    String insertQuery = 
TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_VALUE_LOC_MAP\"", columns);
+    List<Long> idWithColValueLoc = filterIdsByNonNullValue(sdIds, 
sdIdToColValueLoc);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, 
idWithColValueLoc,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws Exception {
+            for (Long sdId : input) {
+              List<Pair<Long, String>> stringListIdAndLoc = 
sdIdToColValueLoc.get(sdId);
+              for (Pair<Long, String> pair : stringListIdAndLoc) {
+                statement.setLong(1, sdId);
+                statement.setLong(2, pair.getLeft());
+                statement.setString(3, pair.getRight());
+                statement.addBatch();
+              }
+            }
+            statement.executeBatch();
+            return null;
+          }
+        }
+    ), insertQuery);
+  }
+
+  private Map<Long, Long> updateCDInBatch(List<Long> cdIds, List<Long> sdIds, 
Map<Long, Long> sdIdToCdId,
+                                          Map<Long, List<FieldSchema>> 
sdIdToNewColumns) throws MetaException {
+    Map<Long, List<Pair<Integer, FieldSchema>>> cdIdToColIdxPair = new 
HashMap<>();
+    Batchable.runBatched(maxBatchSize, cdIds, new Batchable<Long, Void>() {
+      @Override
+      public List<Void> run(List<Long> input) throws Exception {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        String queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", 
\"TYPE_NAME\", " +
+            "\"INTEGER_IDX\" from \"COLUMNS_V2\" where \"CD_ID\" in (" + 
idLists + ")";
+        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+          List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(), 
null, queryText);
+          for (Object[] row : sqlResult) {
+            Long id = extractSqlLong(row[0]);
+            String comment = (String) row[1];
+            String name = (String) row[2];
+            String type = (String) row[3];
+            int index = extractSqlInt(row[4]);
+            FieldSchema field = new FieldSchema(name, type, comment);
+            cdIdToColIdxPair.computeIfAbsent(id, k -> new 
ArrayList<>()).add(Pair.of(index, field));
+          }
+        }
+        return null;
+      }
+    });
+    List<Long> newCdIds = new ArrayList<>();
+    Map<Long, List<FieldSchema>> newCdIdToCols = new HashMap<>();
+    Map<Long, Long> oldCdIdToNewCdId = new HashMap<>();
+    Map<Long, Long> sdIdToNewCdId = new HashMap<>();
+    // oldCdId -> [(oldIdx, newIdx)], used to update KEY_CONSTRAINTS
+    Map<Long, List<Pair<Integer, Integer>>> oldCdIdToColIdxPairs = new 
HashMap<>();
+    for (Long sdId : sdIds) {
+      Long cdId = sdIdToCdId.get(sdId);
+      List<Pair<Integer, FieldSchema>> cols = cdIdToColIdxPair.get(cdId);
+      // Placeholder to avoid IndexOutOfBoundsException.
+      List<FieldSchema> oldCols = new 
ArrayList<>(Collections.nCopies(cols.size(), null));
+      cols.forEach(pair -> oldCols.set(pair.getLeft(), pair.getRight()));
+
+      List<FieldSchema> newCols = sdIdToNewColumns.get(sdId);
+      // Use the new column descriptor only if the old column descriptor 
differs from the new one.
+      if (oldCols == null || !oldCols.equals(newCols)) {
+        if (oldCols != null && newCols != null) {
+          Long newCdId = getDataStoreId(MColumnDescriptor.class);
+          newCdIds.add(newCdId);
+          newCdIdToCols.put(newCdId, newCols);
+          oldCdIdToNewCdId.put(cdId, newCdId);
+          sdIdToNewCdId.put(sdId, newCdId);
+          for (int i = 0; i < oldCols.size(); i++) {
+            FieldSchema oldCol = oldCols.get(i);
+            int newIdx = newCols.indexOf(oldCol);
+            if (newIdx != -1) {
+              oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new 
ArrayList<>()).add(Pair.of(i, newIdx));
+            }
+          }
+        }
+      }
+    }
+
+    insertCDInBatch(newCdIds, newCdIdToCols);
+    // TODO: followed the jdo implement now, but it should be an error in such 
case:
+    //       partitions use the default table cd, when changing partition cd 
with
+    //       constraint key mapping, the constraints will be update unexpected.
+    updateKeyConstraintsInBatch(oldCdIdToNewCdId, oldCdIdToColIdxPairs);
+
+    return sdIdToNewCdId;
+  }
+
+  private void insertCDInBatch(List<Long> ids, Map<Long, List<FieldSchema>> 
idToCols)
+      throws MetaException {
+    String insertCds = TxnUtils.createInsertPreparedStmt("\"CDS\"", 
Arrays.asList("\"CD_ID\""));
+    int maxRows = dbType.getMaxRows(maxBatchSize, 1);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws SQLException {
+            for (Long id : input) {
+              statement.setLong(1, id);
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+    }), insertCds);
+
+    List<String> columns = Arrays.asList("\"CD_ID\"",
+        "\"COMMENT\"", "\"COLUMN_NAME\"", "\"TYPE_NAME\"", "\"INTEGER_IDX\"");
+    String insertColumns = TxnUtils.createInsertPreparedStmt("\"COLUMNS_V2\"", 
columns);
+    int maxRowsForCDs = dbType.getMaxRows(maxBatchSize, 5);
+    updateWithStatement(statement -> Batchable.runBatched(maxRowsForCDs, ids,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws Exception {
+            for (Long id : input) {
+              List<FieldSchema> cols = idToCols.get(id);
+              for (int i = 0; i < cols.size(); i++) {
+                FieldSchema col = cols.get(i);
+                statement.setLong(1, id);
+                statement.setString(2, col.getComment());
+                statement.setString(3, col.getName());
+                statement.setString(4, col.getType());
+                statement.setInt(5, i);
+                statement.addBatch();
+              }
+            }
+            statement.executeBatch();
+            return null;
+          }
+    }), insertColumns);
+  }
+
+  private void updateKeyConstraintsInBatch(Map<Long, Long> oldCdIdToNewCdId,
+                                           Map<Long, List<Pair<Integer, 
Integer>>> oldCdIdToColIdxPairs) throws MetaException {
+    List<Long> oldCdIds = new ArrayList<>(oldCdIdToNewCdId.keySet());
+    String tableName = "\"KEY_CONSTRAINTS\"";
+    List<String> parentColumns = Arrays.asList("\"PARENT_CD_ID\"", 
"\"PARENT_INTEGER_IDX\"");
+    List<String> childColumns = Arrays.asList("\"CHILD_CD_ID\"", 
"\"CHILD_INTEGER_IDX\"");
+
+    String updateParent = TxnUtils.createUpdatePreparedStmt(tableName, 
parentColumns, parentColumns);
+    String updateChild = TxnUtils.createUpdatePreparedStmt(tableName, 
childColumns, childColumns);
+    for (String updateStmt : new String[]{updateParent, updateChild}) {
+      int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+      updateWithStatement(statement -> Batchable.runBatched(maxRows, oldCdIds,
+          new Batchable<Long, Void>() {
+            @Override
+            public List<Void> run(List<Long> input) throws SQLException {
+              for (Long oldId : input) {
+                // Followed the jdo implement to update only mapping columns 
for KEY_CONSTRAINTS.
+                if (!oldCdIdToColIdxPairs.containsKey(oldId)) {
+                  continue;
+                }
+                Long newId = oldCdIdToNewCdId.get(oldId);
+                for (Pair<Integer, Integer> idx : 
oldCdIdToColIdxPairs.get(oldId)) {
+                  statement.setLong(1, newId);
+                  statement.setInt(2, idx.getRight());
+                  statement.setLong(3, oldId);
+                  statement.setInt(4, idx.getLeft());
+                  statement.addBatch();
+                }
+              }
+              statement.executeBatch();
+              return null;
+            }
+      }), updateStmt);
+    }
+  }
+
+  private void deleteCDInBatch(List<Long> cdIds) throws MetaException {
+    Batchable.runBatched(maxBatchSize, cdIds, new Batchable<Long, Void>() {
+      @Override
+      public List<Void> run(List<Long> input) throws Exception {
+        String idLists = MetaStoreDirectSql.getIdListForIn(input);
+        // First remove any constraints that may be associated with these CDs
+        String deleteConstraintsByCd = "delete from \"KEY_CONSTRAINTS\" where 
\"CHILD_CD_ID\" in ("
+            + idLists + ") or \"PARENT_CD_ID\" in (" + idLists + ")";
+        updateWithStatement(PreparedStatement::executeUpdate, 
deleteConstraintsByCd);
+
+        // Then delete COLUMNS_V2 before CDS for foreign constraints.
+        String deleteColumns = "delete from \"COLUMNS_V2\" where \"CD_ID\" in 
(" + idLists + ")";
+        updateWithStatement(PreparedStatement::executeUpdate, deleteColumns);
+
+        // Finally delete CDS
+        String deleteCDs = "delete from \"CDS\" where \"CD_ID\" in (" + 
idLists + ")";
+        updateWithStatement(PreparedStatement::executeUpdate, deleteCDs);
+        return null;
+      }
+    });
+  }
+
+  private void updateSerdeInBatch(List<Long> ids, Map<Long, SerDeInfo> 
idToSerde)
+      throws MetaException {
+    // Followed the jdo implement to update only NAME and SLIB of SERDES.
+    List<String> columns = Arrays.asList("\"NAME\"", "\"SLIB\"");
+    List<String> condKeys = Arrays.asList("\"SERDE_ID\"");
+    String updateStmt = TxnUtils.createUpdatePreparedStmt("\"SERDES\"", 
columns, condKeys);
+    List<Long> idWithSerde = filterIdsByNonNullValue(ids, idToSerde);
+    int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+    updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSerde,
+        new Batchable<Long, Void>() {
+          @Override
+          public List<Void> run(List<Long> input) throws SQLException {
+            for (Long id : input) {
+              SerDeInfo serde = idToSerde.get(id);
+              statement.setString(1, serde.getName());
+              statement.setString(2, serde.getSerializationLib());
+              statement.setLong(3, id);
+              statement.addBatch();
+            }
+            statement.executeBatch();
+            return null;
+          }
+    }), updateStmt);
+  }
+
+  private static final class PartitionInfo {
+    long partitionId;
+    long writeId;
+    String partitionName;
+    public PartitionInfo(long partitionId, long writeId, String partitionName) 
{
+      this.partitionId = partitionId;
+      this.writeId = writeId;
+      this.partitionName = partitionName;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)partitionId;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null) {
+        return false;
+      }
+      if (!(o instanceof PartitionInfo)) {
+        return false;
+      }
+      PartitionInfo other = (PartitionInfo)o;
+      if (this.partitionId != other.partitionId) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private static final class PartColNameInfo {
+    long partitionId;
+    String colName;
+    public PartColNameInfo(long partitionId, String colName) {
+      this.partitionId = partitionId;
+      this.colName = colName;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)partitionId;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null) {
+        return false;
+      }
+      if (!(o instanceof PartColNameInfo)) {
+        return false;
+      }
+      PartColNameInfo other = (PartColNameInfo)o;
+      if (this.partitionId != other.partitionId) {
+        return false;
+      }
+      if (this.colName.equalsIgnoreCase(other.colName)) {
+        return true;
+      }
+      return false;
+    }
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
deleted file mode 100644
index df1f77f2064..00000000000
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
+++ /dev/null
@@ -1,727 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
-import 
org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEventBatch;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage;
-import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
-import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import javax.jdo.PersistenceManager;
-import javax.jdo.datastore.JDOConnection;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
-import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
-import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-
-/**
- * This class contains the optimizations for MetaStore that rely on direct SQL 
access to
- * the underlying database. It should use ANSI SQL and be compatible with 
common databases
- * such as MySQL (note that MySQL doesn't use full ANSI mode by default), 
Postgres, etc.
- *
- * This class separates out the statistics update part from MetaStoreDirectSql 
class.
- */
-class DirectSqlUpdateStat {
-  private static final Logger LOG = 
LoggerFactory.getLogger(DirectSqlUpdateStat.class.getName());
-  PersistenceManager pm;
-  Configuration conf;
-  DatabaseProduct dbType;
-  int maxBatchSize;
-  SQLGenerator sqlGenerator;
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-  
-  public DirectSqlUpdateStat(PersistenceManager pm, Configuration conf,
-                                          DatabaseProduct dbType, int 
batchSize) {
-    this.pm = pm;
-    this.conf = conf;
-    this.dbType = dbType;
-    this.maxBatchSize = batchSize;
-    sqlGenerator = new SQLGenerator(dbType, conf);
-  }
-
-  /**
-   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
-   * Select ... For Update to sequence operations properly.  In practice that 
means when running
-   * with Derby database.  See more notes at class level.
-   */
-  private void lockInternal() {
-    if(dbType.isDERBY()) {
-      derbyLock.lock();
-    }
-  }
-
-  private void unlockInternal() {
-    if(dbType.isDERBY()) {
-      derbyLock.unlock();
-    }
-  }
-
-  void rollbackDBConn(Connection dbConn) {
-    try {
-      if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
-    } catch (SQLException e) {
-      LOG.warn("Failed to rollback db connection ", e);
-    }
-  }
-
-  void closeDbConn(JDOConnection jdoConn) {
-    try {
-      if (jdoConn != null) {
-        jdoConn.close();
-      }
-    } catch (Exception e) {
-      LOG.warn("Failed to close db connection", e);
-    }
-  }
-
-  void closeStmt(Statement stmt) {
-    try {
-      if (stmt != null && !stmt.isClosed()) stmt.close();
-    } catch (SQLException e) {
-      LOG.warn("Failed to close statement ", e);
-    }
-  }
-
-  void close(ResultSet rs) {
-    try {
-      if (rs != null && !rs.isClosed()) {
-        rs.close();
-      }
-    }
-    catch(SQLException ex) {
-      LOG.warn("Failed to close statement ", ex);
-    }
-  }
-
-  static String quoteString(String input) {
-    return "'" + input + "'";
-  }
-
-  void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
-    close(rs);
-    closeStmt(stmt);
-    closeDbConn(dbConn);
-  }
-
-  private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics> 
statsPartInfoMap,
-                                       Map<PartColNameInfo, 
MPartitionColumnStatistics> updateMap,
-                                       Map<PartColNameInfo, 
MPartitionColumnStatistics>insertMap,
-                                       Connection dbConn, Table tbl) throws 
SQLException, MetaException, NoSuchObjectException {
-    StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
-    List<String> queries = new ArrayList<>();
-    Set<PartColNameInfo> selectedParts = new HashSet<>();
-
-    List<Long> partIdList = statsPartInfoMap.keySet().stream().map(
-            e -> e.partitionId).collect(Collectors.toList()
-    );
-
-    prefix.append("select \"PART_ID\", \"COLUMN_NAME\" from \"PART_COL_STATS\" 
WHERE ");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            partIdList, "\"PART_ID\"", true, false);
-
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query " + query);
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          selectedParts.add(new PartColNameInfo(rs.getLong(1), 
rs.getString(2)));
-        }
-      } finally {
-        close(rs, statement, null);
-      }
-    }
-
-    for (Map.Entry entry : statsPartInfoMap.entrySet()) {
-      PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
-      ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
-      long partId = partitionInfo.partitionId;
-      ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
-      if (!statsDesc.isSetCatName()) {
-        statsDesc.setCatName(tbl.getCatName());
-      }
-      for (ColumnStatisticsObj statisticsObj : colStats.getStatsObj()) {
-        PartColNameInfo temp = new PartColNameInfo(partId, 
statisticsObj.getColName());
-        if (selectedParts.contains(temp)) {
-          updateMap.put(temp, StatObjectConverter.
-                  convertToMPartitionColumnStatistics(null, statsDesc, 
statisticsObj, colStats.getEngine()));
-        } else {
-          insertMap.put(temp, StatObjectConverter.
-                  convertToMPartitionColumnStatistics(null, statsDesc, 
statisticsObj, colStats.getEngine()));
-        }
-      }
-    }
-  }
-
-  private void updatePartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> updateMap,
-                                          Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
-    PreparedStatement pst = null;
-    for (Map.Entry entry : updateMap.entrySet()) {
-      PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
-      Long partId = partColNameInfo.partitionId;
-      MPartitionColumnStatistics mPartitionColumnStatistics = 
(MPartitionColumnStatistics) entry.getValue();
-      String update = "UPDATE \"PART_COL_STATS\" SET ";
-      update += 
StatObjectConverter.getUpdatedColumnSql(mPartitionColumnStatistics);
-      update += " WHERE \"PART_ID\" = " + partId + " AND "
-              + " \"COLUMN_NAME\" = " +  
quoteString(mPartitionColumnStatistics.getColName());
-      try {
-        pst = dbConn.prepareStatement(update);
-        
StatObjectConverter.initUpdatedColumnStatement(mPartitionColumnStatistics, pst);
-        LOG.debug("Going to execute update " + update);
-        int numUpdate = pst.executeUpdate();
-        if (numUpdate != 1) {
-          throw new MetaException("Invalid state of  PART_COL_STATS for 
PART_ID " + partId);
-        }
-      } finally {
-        closeStmt(pst);
-      }
-    }
-  }
-
-  private void insertIntoPartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> insertMap,
-                                          long maxCsId,
-                                          Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
-    PreparedStatement preparedStatement = null;
-    int numRows = 0;
-    String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\", 
\"DB_NAME\","
-            + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\", 
\"COLUMN_TYPE\", \"PART_ID\","
-            + " \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\", 
\"DOUBLE_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\","
-            + " \"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", 
\"NUM_NULLS\", \"NUM_DISTINCTS\", \"BIT_VECTOR\" ,"
-            + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", 
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
-            + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
-
-    try {
-      preparedStatement = dbConn.prepareStatement(insert);
-      for (Map.Entry entry : insertMap.entrySet()) {
-        PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
-        Long partId = partColNameInfo.partitionId;
-        MPartitionColumnStatistics mPartitionColumnStatistics = 
(MPartitionColumnStatistics) entry.getValue();
-
-        preparedStatement.setLong(1, maxCsId);
-        preparedStatement.setString(2, 
mPartitionColumnStatistics.getCatName());
-        preparedStatement.setString(3, mPartitionColumnStatistics.getDbName());
-        preparedStatement.setString(4, 
mPartitionColumnStatistics.getTableName());
-        preparedStatement.setString(5, 
mPartitionColumnStatistics.getPartitionName());
-        preparedStatement.setString(6, 
mPartitionColumnStatistics.getColName());
-        preparedStatement.setString(7, 
mPartitionColumnStatistics.getColType());
-        preparedStatement.setLong(8, partId);
-        preparedStatement.setObject(9, 
mPartitionColumnStatistics.getLongLowValue());
-        preparedStatement.setObject(10, 
mPartitionColumnStatistics.getLongHighValue());
-        preparedStatement.setObject(11, 
mPartitionColumnStatistics.getDoubleHighValue());
-        preparedStatement.setObject(12, 
mPartitionColumnStatistics.getDoubleLowValue());
-        preparedStatement.setString(13, 
mPartitionColumnStatistics.getDecimalLowValue());
-        preparedStatement.setString(14, 
mPartitionColumnStatistics.getDecimalHighValue());
-        preparedStatement.setObject(15, 
mPartitionColumnStatistics.getNumNulls());
-        preparedStatement.setObject(16, 
mPartitionColumnStatistics.getNumDVs());
-        preparedStatement.setObject(17, 
mPartitionColumnStatistics.getBitVector());
-        preparedStatement.setBytes(18, 
mPartitionColumnStatistics.getHistogram());
-        preparedStatement.setObject(19, 
mPartitionColumnStatistics.getAvgColLen());
-        preparedStatement.setObject(20, 
mPartitionColumnStatistics.getMaxColLen());
-        preparedStatement.setObject(21, 
mPartitionColumnStatistics.getNumTrues());
-        preparedStatement.setObject(22, 
mPartitionColumnStatistics.getNumFalses());
-        preparedStatement.setLong(23, 
mPartitionColumnStatistics.getLastAnalyzed());
-        preparedStatement.setString(24, 
mPartitionColumnStatistics.getEngine());
-
-        maxCsId++;
-        numRows++;
-        preparedStatement.addBatch();
-        if (numRows == maxBatchSize) {
-          preparedStatement.executeBatch();
-          numRows = 0;
-        }
-      }
-
-      if (numRows != 0) {
-        preparedStatement.executeBatch();
-      }
-    } finally {
-      closeStmt(preparedStatement);
-    }
-  }
-
-  private Map<Long, String> getParamValues(Connection dbConn, List<Long> 
partIdList) throws SQLException {
-    List<String> queries = new ArrayList<>();
-    StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
-
-    prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
-            + " from \"PARTITION_PARAMS\" where "
-            + " \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE' "
-            + " and ");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            partIdList, "\"PART_ID\"", true, false);
-
-    Map<Long, String> partIdToParaMap = new HashMap<>();
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query " + query);
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          partIdToParaMap.put(rs.getLong(1), rs.getString(2));
-        }
-      } finally {
-        close(rs, statement, null);
-      }
-    }
-    return partIdToParaMap;
-  }
-
-  private void updateWriteIdForPartitions(Connection dbConn, long writeId, 
List<Long> partIdList) throws SQLException {
-    StringBuilder prefix = new StringBuilder();
-    List<String> queries = new ArrayList<>();
-    StringBuilder suffix = new StringBuilder();
-
-    prefix.append("UPDATE \"PARTITIONS\" set \"WRITE_ID\" = " + writeId + " 
where ");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            partIdList, "\"PART_ID\"", false, false);
-
-    Statement statement = null;
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute update " + query);
-        statement.executeUpdate(query);
-      } finally {
-        closeStmt(statement);
-      }
-    }
-  }
-
-  private Map<String, Map<String, String>> 
updatePartitionParamTable(Connection dbConn,
-                                                                     
Map<PartitionInfo, ColumnStatistics> partitionInfoMap,
-                                                                     String 
validWriteIds,
-                                                                     long 
writeId,
-                                                                     boolean 
isAcidTable)
-          throws SQLException, MetaException {
-    Map<String, Map<String, String>> result = new HashMap<>();
-    boolean areTxnStatsSupported = MetastoreConf.getBoolVar(conf, 
ConfVars.HIVE_TXN_STATS_ENABLED);
-    PreparedStatement statementInsert = null;
-    PreparedStatement statementDelete = null;
-    PreparedStatement statementUpdate = null;
-    String insert = "INSERT INTO \"PARTITION_PARAMS\" (\"PART_ID\", 
\"PARAM_KEY\", \"PARAM_VALUE\") "
-            + "VALUES( ? , 'COLUMN_STATS_ACCURATE'  , ? )";
-    String delete = "DELETE from \"PARTITION_PARAMS\" "
-            + " where \"PART_ID\" = ? "
-            + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
-    String update = "UPDATE \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? "
-            + " where \"PART_ID\" = ? "
-            + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
-    int numInsert = 0;
-    int numDelete = 0;
-    int numUpdate = 0;
-
-    List<Long> partIdList = partitionInfoMap.keySet().stream().map(
-            e -> e.partitionId).collect(Collectors.toList()
-    );
-
-    // get the old parameters from PARTITION_PARAMS table.
-    Map<Long, String> partIdToParaMap = getParamValues(dbConn, partIdList);
-
-    try {
-      statementInsert = dbConn.prepareStatement(insert);
-      statementDelete = dbConn.prepareStatement(delete);
-      statementUpdate = dbConn.prepareStatement(update);
-      for (Map.Entry entry : partitionInfoMap.entrySet()) {
-        PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
-        ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
-        List<String> colNames = colStats.getStatsObj().stream().map(e -> 
e.getColName()).collect(Collectors.toList());
-        long partWriteId = partitionInfo.writeId;
-        long partId = partitionInfo.partitionId;
-        Map<String, String> newParameter;
-
-        if (!partIdToParaMap.containsKey(partId)) {
-          newParameter = new HashMap<>();
-          newParameter.put(COLUMN_STATS_ACCURATE, "TRUE");
-          StatsSetupConst.setColumnStatsState(newParameter, colNames);
-          statementInsert.setLong(1, partId);
-          statementInsert.setString(2, 
newParameter.get(COLUMN_STATS_ACCURATE));
-          numInsert++;
-          statementInsert.addBatch();
-          if (numInsert == maxBatchSize) {
-            LOG.debug(" Executing insert " + insert);
-            statementInsert.executeBatch();
-            numInsert = 0;
-          }
-        } else {
-          String oldStats = partIdToParaMap.get(partId);
-
-          Map<String, String> oldParameter = new HashMap<>();
-          oldParameter.put(COLUMN_STATS_ACCURATE, oldStats);
-
-          newParameter = new HashMap<>();
-          newParameter.put(COLUMN_STATS_ACCURATE, oldStats);
-          StatsSetupConst.setColumnStatsState(newParameter, colNames);
-
-          if (isAcidTable) {
-            String errorMsg = ObjectStore.verifyStatsChangeCtx(
-                    colStats.getStatsDesc().getDbName() + "." + 
colStats.getStatsDesc().getTableName(),
-                    oldParameter, newParameter, writeId, validWriteIds, true);
-            if (errorMsg != null) {
-              throw new MetaException(errorMsg);
-            }
-          }
-
-          if (isAcidTable &&
-                  (!areTxnStatsSupported || 
!ObjectStore.isCurrentStatsValidForTheQuery(oldParameter, partWriteId,
-                          validWriteIds, true))) {
-            statementDelete.setLong(1, partId);
-            statementDelete.addBatch();
-            numDelete++;
-            if (numDelete == maxBatchSize) {
-              statementDelete.executeBatch();
-              numDelete = 0;
-              LOG.debug("Removed COLUMN_STATS_ACCURATE from the parameters of 
the partition "
-                      + colStats.getStatsDesc().getDbName() + "." + 
colStats.getStatsDesc().getTableName() + "."
-                      + colStats.getStatsDesc().getPartName());
-            }
-          } else {
-            statementUpdate.setString(1, 
newParameter.get(COLUMN_STATS_ACCURATE));
-            statementUpdate.setLong(2, partId);
-            statementUpdate.addBatch();
-            numUpdate++;
-            if (numUpdate == maxBatchSize) {
-              LOG.debug(" Executing update " + statementUpdate);
-              statementUpdate.executeBatch();
-              numUpdate = 0;
-            }
-          }
-        }
-        result.put(partitionInfo.partitionName, newParameter);
-      }
-
-      if (numInsert != 0) {
-        statementInsert.executeBatch();
-      }
-
-      if (numUpdate != 0) {
-        statementUpdate.executeBatch();
-      }
-
-      if (numDelete != 0) {
-        statementDelete.executeBatch();
-      }
-
-      if (isAcidTable) {
-        updateWriteIdForPartitions(dbConn, writeId, partIdList);
-      }
-      return result;
-    } finally {
-      closeStmt(statementInsert);
-      closeStmt(statementUpdate);
-      closeStmt(statementDelete);
-    }
-  }
-
-  private static class PartitionInfo {
-    long partitionId;
-    long writeId;
-    String partitionName;
-    public PartitionInfo(long partitionId, long writeId, String partitionName) 
{
-      this.partitionId = partitionId;
-      this.writeId = writeId;
-      this.partitionName = partitionName;
-    }
-
-    @Override
-    public int hashCode() {
-      return (int)partitionId;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (!(o instanceof PartitionInfo)) {
-        return false;
-      }
-      PartitionInfo other = (PartitionInfo)o;
-      if (this.partitionId != other.partitionId) {
-        return false;
-      }
-      return true;
-    }
-  }
-
-  private static class PartColNameInfo {
-    long partitionId;
-    String colName;
-    public PartColNameInfo(long partitionId, String colName) {
-      this.partitionId = partitionId;
-      this.colName = colName;
-    }
-
-    @Override
-    public int hashCode() {
-      return (int)partitionId;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (!(o instanceof PartColNameInfo)) {
-        return false;
-      }
-      PartColNameInfo other = (PartColNameInfo)o;
-      if (this.partitionId != other.partitionId) {
-        return false;
-      }
-      if (this.colName.equalsIgnoreCase(other.colName)) {
-        return true;
-      }
-      return false;
-    }
-  }
-
-  private Map<PartitionInfo, ColumnStatistics> getPartitionInfo(Connection 
dbConn, long tblId,
-                                                                 Map<String, 
ColumnStatistics> partColStatsMap)
-          throws SQLException, MetaException {
-    List<String> queries = new ArrayList<>();
-    StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
-    Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
-
-    List<String> partKeys = partColStatsMap.keySet().stream().map(
-            e -> quoteString(e)).collect(Collectors.toList()
-    );
-
-    prefix.append("select \"PART_ID\", \"WRITE_ID\", \"PART_NAME\"  from 
\"PARTITIONS\" where ");
-    suffix.append(" and  \"TBL_ID\" = " + tblId);
-    TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
-            partKeys, "\"PART_NAME\"", true, false);
-
-    for (String query : queries) {
-      // Select for update makes sure that the partitions are not modified 
while the stats are getting updated.
-      query = sqlGenerator.addForUpdateClause(query);
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query <" + query + ">");
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
-                  rs.getLong(2), rs.getString(3));
-          partitionInfoMap.put(partitionInfo, 
partColStatsMap.get(rs.getString(3)));
-        }
-      } finally {
-        close(rs, statement, null);
-      }
-    }
-    return partitionInfoMap;
-  }
-
-  private void setAnsiQuotes(Connection dbConn) throws SQLException {
-    if (sqlGenerator.getDbProduct().isMYSQL()) {
-      try (Statement stmt = dbConn.createStatement()) {
-        stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
-      }
-    }
-  }
-
-  /**
-   * Update the statistics for the given partitions. Add the notification logs 
also.
-   * @return map of partition key to column stats if successful, null 
otherwise.
-   */
-  public Map<String, Map<String, String>> 
updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap,
-                                                      Table tbl, long csId,
-                                                      String validWriteIds, 
long writeId,
-                                                      
List<TransactionalMetaStoreEventListener> transactionalListeners)
-          throws MetaException {
-    JDOConnection jdoConn = null;
-    Connection dbConn = null;
-    boolean committed = false;
-    try {
-      lockInternal();
-      jdoConn = pm.getDataStoreConnection();
-      dbConn = (Connection) (jdoConn.getNativeConnection());
-
-      setAnsiQuotes(dbConn);
-
-      Map<PartitionInfo, ColumnStatistics> partitionInfoMap = 
getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);
-
-      Map<String, Map<String, String>> result =
-              updatePartitionParamTable(dbConn, partitionInfoMap, 
validWriteIds, writeId, TxnUtils.isAcidTable(tbl));
-
-      Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new 
HashMap<>();
-      Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new 
HashMap<>();
-      populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn, 
tbl);
-
-      LOG.info("Number of stats to insert  " + insertMap.size() + " update " + 
updateMap.size());
-
-      if (insertMap.size() != 0) {
-        insertIntoPartColStatTable(insertMap, csId, dbConn);
-      }
-
-      if (updateMap.size() != 0) {
-        updatePartColStatTable(updateMap, dbConn);
-      }
-
-      if (transactionalListeners != null) {
-        UpdatePartitionColumnStatEventBatch eventBatch = new 
UpdatePartitionColumnStatEventBatch(null);
-        for (Map.Entry entry : result.entrySet()) {
-          Map<String, String> parameters = (Map<String, String>) 
entry.getValue();
-          ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
-          List<String> partVals = getPartValsFromName(tbl, 
colStats.getStatsDesc().getPartName());
-          UpdatePartitionColumnStatEvent event = new 
UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
-                  tbl, writeId, null);
-          eventBatch.addPartColStatEvent(event);
-        }
-        
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH, 
eventBatch, dbConn, sqlGenerator);
-      }
-      dbConn.commit();
-      committed = true;
-      return result;
-    } catch (Exception e) {
-      LOG.error("Unable to update Column stats for  " + tbl.getTableName(), e);
-      throw new MetaException("Unable to update Column stats for  " + 
tbl.getTableName()
-              + " due to: "  + e.getMessage());
-    } finally {
-      if (!committed) {
-        rollbackDBConn(dbConn);
-      }
-      closeDbConn(jdoConn);
-      unlockInternal();
-    }
-  }
-
-  /**
-   * Gets the next CS id from sequence MPartitionColumnStatistics and 
increment the CS id by numStats.
-   * @return The CD id before update.
-   */
-  public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws 
MetaException {
-    Statement statement = null;
-    ResultSet rs = null;
-    long maxCsId = 0;
-    boolean committed = false;
-    Connection dbConn = null;
-    JDOConnection jdoConn = null;
-
-    try {
-      lockInternal();
-      jdoConn = pm.getDataStoreConnection();
-      dbConn = (Connection) (jdoConn.getNativeConnection());
-
-      setAnsiQuotes(dbConn);
-
-      // This loop will be iterated at max twice. If there is no records, it 
will first insert and then do a select.
-      // We are not using any upsert operations as select for update and then 
update is required to make sure that
-      // the caller gets a reserved range for CSId not used by any other 
thread.
-      boolean insertDone = false;
-      while (maxCsId == 0) {
-        String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\" 
FROM \"SEQUENCE_TABLE\" "
-                + "WHERE \"SEQUENCE_NAME\"= "
-                + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
-        LOG.debug("Going to execute query " + query);
-        statement = dbConn.createStatement();
-        rs = statement.executeQuery(query);
-        if (rs.next()) {
-          maxCsId = rs.getLong(1);
-        } else if (insertDone) {
-          throw new MetaException("Invalid state of SEQUENCE_TABLE for 
MPartitionColumnStatistics");
-        } else {
-          insertDone = true;
-          closeStmt(statement);
-          statement = dbConn.createStatement();
-          query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", 
\"NEXT_VAL\")  VALUES ( "
-                  + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
 + "," + 1
-                  + ")";
-          try {
-            statement.executeUpdate(query);
-          } catch (SQLException e) {
-            // If the record is already inserted by some other thread continue 
to select.
-            if (dbType.isDuplicateKeyError(e)) {
-              continue;
-            }
-            LOG.error("Unable to insert into SEQUENCE_TABLE for 
MPartitionColumnStatistics.", e);
-            throw e;
-          } finally {
-            closeStmt(statement);
-          }
-        }
-      }
-
-      long nextMaxCsId = maxCsId + numStats + 1;
-      closeStmt(statement);
-      statement = dbConn.createStatement();
-      String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
-              + nextMaxCsId
-              + " WHERE \"SEQUENCE_NAME\" = "
-              + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
-      statement.executeUpdate(query);
-      dbConn.commit();
-      committed = true;
-      return maxCsId;
-    } catch (Exception e) {
-      LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
-      throw new MetaException("Unable to 
getNextCSIdForMPartitionColumnStatistics  "
-              + " due to: " + e.getMessage());
-    } finally {
-      if (!committed) {
-        rollbackDBConn(dbConn);
-      }
-      close(rs, statement, jdoConn);
-      unlockInternal();
-    }
-  }
-}
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 3ed850efbab..7100bf93ae1 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -5791,9 +5791,8 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
                                                        final 
EnvironmentContext envContext)
       throws TException {
     String[] parsedDbName = parseDbName(dbName, conf);
-    // TODO: this method name is confusing, it actually does full alter 
(sortof)
-    rename_partition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, 
null, newPartition,
-        envContext, null);
+    alter_partition_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], 
tableName, null,
+        newPartition, envContext, null);
   }
 
   @Deprecated
@@ -5801,9 +5800,9 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
   public void rename_partition(final String db_name, final String tbl_name,
                                final List<String> part_vals, final Partition 
new_part)
       throws TException {
-    // Call rename_partition without an environment context.
+    // Call alter_partition_core without an environment context.
     String[] parsedDbName = parseDbName(db_name, conf);
-    rename_partition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, 
part_vals, new_part,
+    alter_partition_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], 
tbl_name, part_vals, new_part,
         null, null);
   }
 
@@ -5813,12 +5812,12 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
     context.putToProperties(RENAME_PARTITION_MAKE_COPY, 
String.valueOf(req.isClonePart()));
     context.putToProperties(hive_metastoreConstants.TXN_ID, 
String.valueOf(req.getTxnId()));
     
-    rename_partition(req.getCatName(), req.getDbName(), req.getTableName(), 
req.getPartVals(),
+    alter_partition_core(req.getCatName(), req.getDbName(), 
req.getTableName(), req.getPartVals(),
         req.getNewPart(), context, req.getValidWriteIdList());
     return new RenamePartitionResponse();
   };
 
-  private void rename_partition(String catName, String db_name, String 
tbl_name,
+  private void alter_partition_core(String catName, String db_name, String 
tbl_name,
                                 List<String> part_vals, Partition new_part, 
EnvironmentContext envContext,
                                 String validWriteIds) throws TException {
     startTableFunction("alter_partition", catName, db_name, tbl_name);
@@ -5847,8 +5846,7 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
     Partition oldPart = null;
     Exception ex = null;
     try {
-      Table table = null;
-      table = getMS().getTable(catName, db_name, tbl_name, null);
+      Table table = getMS().getTable(catName, db_name, tbl_name, null);
 
       firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, table, 
part_vals, new_part, this));
       if (part_vals != null && !part_vals.isEmpty()) {
@@ -5859,8 +5857,6 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
       oldPart = alterHandler.alterPartition(getMS(), wh, catName, db_name, 
tbl_name,
           part_vals, new_part, envContext, this, validWriteIds);
 
-      // Only fetch the table if we actually have a listener
-
       if (!listeners.isEmpty()) {
         MetaStoreListenerNotifier.notifyEvent(listeners,
             EventType.ALTER_PARTITION,
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 97956660791..d5e0db5366c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -95,13 +95,13 @@ import 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.model.MConstraint;
 import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
 import org.apache.hadoop.hive.metastore.model.MDatabase;
-import org.apache.hadoop.hive.metastore.model.MFunction;
 import org.apache.hadoop.hive.metastore.model.MNotificationLog;
 import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
 import org.apache.hadoop.hive.metastore.model.MPartition;
 import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
 import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
 import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
+import org.apache.hadoop.hive.metastore.model.MTable;
 import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
 import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
@@ -111,6 +111,7 @@ import 
org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -152,6 +153,7 @@ class MetaStoreDirectSql {
   private final int batchSize;
   private final boolean convertMapNullsToEmptyStrings;
   private final String defaultPartName;
+  private final boolean isTxnStatsEnabled;
 
   /**
    * Whether direct SQL can be used with the current datastore backing {@link 
#pm}.
@@ -160,7 +162,7 @@ class MetaStoreDirectSql {
   private final boolean isAggregateStatsCacheEnabled;
   private final ImmutableMap<String, String> fieldnameToTableName;
   private AggregateStatsCache aggrStatsCache;
-  private DirectSqlUpdateStat updateStat;
+  private DirectSqlUpdatePart directSqlUpdatePart;
   private DirectSqlInsertPart directSqlInsertPart;
 
   /**
@@ -203,7 +205,8 @@ class MetaStoreDirectSql {
       batchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING;
     }
     this.batchSize = batchSize;
-    this.updateStat = new DirectSqlUpdateStat(pm, conf, dbType, batchSize);
+    this.isTxnStatsEnabled = MetastoreConf.getBoolVar(conf, 
ConfVars.HIVE_TXN_STATS_ENABLED);
+    this.directSqlUpdatePart = new DirectSqlUpdatePart(pm, conf, dbType, 
batchSize);
     ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
         new ImmutableMap.Builder<>();
 
@@ -535,6 +538,69 @@ class MetaStoreDirectSql {
     directSqlInsertPart.addPartitions(parts, partPrivilegesList, 
partColPrivilegesList);
   }
 
+  /**
+   * Alter partitions in batch using direct SQL
+   * @param table the target table
+   * @param partNames list of partition names
+   * @param newParts list of new partitions
+   * @param queryWriteIdList valid write id list
+   * @return
+   * @throws MetaException
+   */
+  public List<Partition> alterPartitions(MTable table, List<String> partNames,
+                                         List<Partition> newParts, String 
queryWriteIdList) throws MetaException {
+    List<Object[]> rows = Batchable.runBatched(batchSize, partNames, new 
Batchable<String, Object[]>() {
+      @Override
+      public List<Object[]> run(List<String> input) throws Exception {
+        String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + 
makeParams(input.size()) + ")";
+        List<String> columns = Arrays.asList("\"PART_ID\"", "\"PART_NAME\"", 
"\"SD_ID\"", "\"WRITE_ID\"");
+        return 
getPartitionFieldsViaSqlFilter(table.getDatabase().getCatalogName(), 
table.getDatabase().getName(),
+                table.getTableName(), columns, filter, input, 
Collections.emptyList(), null);
+      }
+    });
+    Map<List<String>, Long> partValuesToId = new HashMap<>();
+    Map<Long, Long> partIdToSdId = new HashMap<>();
+    Map<Long, Long> partIdToWriteId = new HashMap<>();
+    for (Object[] row : rows) {
+      Long partId = MetastoreDirectSqlUtils.extractSqlLong(row[0]);
+      Long sdId = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
+      Long writeId = MetastoreDirectSqlUtils.extractSqlLong(row[3]);
+      partIdToSdId.put(partId, sdId);
+      partIdToWriteId.put(partId, writeId);
+      List<String> partValues = Warehouse.getPartValuesFromPartName((String) 
row[1]);
+      partValuesToId.put(partValues, partId);
+    }
+
+    boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
+    for (Partition newPart : newParts) {
+      Long partId = partValuesToId.get(newPart.getValues());
+      boolean useOldWriteId = true;
+      // If transactional, add/update the MUPdaterTransaction
+      // for the current updater query.
+      if (isTxn) {
+        if (!isTxnStatsEnabled) {
+          StatsSetupConst.setBasicStatsState(newPart.getParameters(), 
StatsSetupConst.FALSE);
+        } else if (queryWriteIdList != null && newPart.getWriteId() > 0) {
+          // Check concurrent INSERT case and set false to the flag.
+          if 
(!ObjectStore.isCurrentStatsValidForTheQuery(newPart.getParameters(),
+              partIdToWriteId.get(partId), queryWriteIdList, true)) {
+            StatsSetupConst.setBasicStatsState(newPart.getParameters(), 
StatsSetupConst.FALSE);
+            LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the 
partition " +
+                Warehouse.getQualifiedName(newPart) + " will be made 
persistent.");
+          }
+          useOldWriteId = false;
+        }
+      }
+
+      if (useOldWriteId) {
+        newPart.setWriteId(partIdToWriteId.get(partId));
+      }
+    }
+
+    directSqlUpdatePart.alterPartitions(partValuesToId, partIdToSdId, 
newParts);
+    return newParts;
+  }
+
   /**
    * Get partition names by using direct SQL queries.
    * @param filter filter to use with direct sql
@@ -901,6 +967,28 @@ class MetaStoreDirectSql {
       String catName, String dbName, String tblName, String sqlFilter,
       List<? extends Object> paramsForFilter, List<String> joinsForFilter, 
Integer max)
       throws MetaException {
+    return getPartitionFieldsViaSqlFilter(catName, dbName, tblName,
+        Arrays.asList("\"PART_ID\""), sqlFilter, paramsForFilter, 
joinsForFilter, max);
+  }
+
+  /**
+   * Get partition fields for the query using direct SQL queries, to avoid 
bazillion
+   * queries created by DN retrieving stuff for each object individually.
+   * @param catName MetaStore catalog name
+   * @param dbName MetaStore db name
+   * @param tblName MetaStore table name
+   * @param partColumns part fields want to get
+   * @param sqlFilter SQL filter to use. Better be SQL92-compliant.
+   * @param paramsForFilter params for ?-s in SQL filter text. Params must be 
in order.
+   * @param joinsForFilter if the filter needs additional join statement, they 
must be in
+   *                       this list. Better be SQL92-compliant.
+   * @param max The maximum number of partitions to return.
+   * @return List of partition objects.
+   */
+  public <T> List<T> getPartitionFieldsViaSqlFilter(
+      String catName, String dbName, String tblName, List<String> partColumns, 
String sqlFilter,
+      List<? extends Object> paramsForFilter, List<String> joinsForFilter, 
Integer max)
+      throws MetaException {
     boolean doTrace = LOG.isDebugEnabled();
     final String dbNameLcase = dbName.toLowerCase();
     final String tblNameLcase = tblName.toLowerCase();
@@ -908,16 +996,17 @@ class MetaStoreDirectSql {
 
     // We have to be mindful of order during filtering if we are not returning 
all partitions.
     String orderForFilter = (max != null) ? " order by " + 
MetastoreConf.getVar(conf, ConfVars.PARTITION_ORDER_EXPR) : "";
+    String columns = partColumns.stream().map(col -> PARTITIONS + "." + 
col).collect(Collectors.joining(","));
 
     String queryText =
-        "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + ""
-      + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS 
+ ".\"TBL_ID\" "
-      + "    and " + TBLS + ".\"TBL_NAME\" = ? "
-      + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\" "
-      + "     and " + DBS + ".\"NAME\" = ? "
-      + join(joinsForFilter, ' ')
-      + " where " + DBS + ".\"CTLG_NAME\" = ? "
-      + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + 
orderForFilter;
+        "select " + columns + " from " + PARTITIONS + ""
+            + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " 
+ TBLS + ".\"TBL_ID\" "
+            + "    and " + TBLS + ".\"TBL_NAME\" = ? "
+            + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + 
".\"DB_ID\" "
+            + "     and " + DBS + ".\"NAME\" = ? "
+            + join(joinsForFilter, ' ')
+            + " where " + DBS + ".\"CTLG_NAME\" = ? "
+            + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + 
orderForFilter;
     Object[] params = new Object[paramsForFilter.size() + 3];
     params[0] = tblNameLcase;
     params[1] = dbNameLcase;
@@ -928,19 +1017,11 @@ class MetaStoreDirectSql {
 
     long start = doTrace ? System.nanoTime() : 0;
     try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
-      List<Object> sqlResult = executeWithArray(query.getInnerQuery(), params, 
queryText,
+      List<T> sqlResult = executeWithArray(query.getInnerQuery(), params, 
queryText,
           ((max == null) ? -1 : max.intValue()));
       long queryTime = doTrace ? System.nanoTime() : 0;
       MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, 
queryTime);
-      final List<Long> result;
-      if (sqlResult.isEmpty()) {
-        result = Collections.emptyList(); // no partitions, bail early.
-      } else {
-        result = new ArrayList<>(sqlResult.size());
-        for (Object fields : sqlResult) {
-          result.add(MetastoreDirectSqlUtils.extractSqlLong(fields));
-        }
-      }
+      List<T> result = new ArrayList<>(sqlResult);
       return result;
     }
   }
@@ -3056,8 +3137,8 @@ class MetaStoreDirectSql {
       ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
       numStats += colStats.getStatsObjSize();
     }
-    long csId = updateStat.getNextCSIdForMPartitionColumnStatistics(numStats);
-    return updateStat.updatePartitionColumnStatistics(partColStatsMap, tbl, 
csId, validWriteIds, writeId, listeners);
+    long csId = 
directSqlUpdatePart.getNextCSIdForMPartitionColumnStatistics(numStats);
+    return 
directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, csId, 
validWriteIds, writeId, listeners);
   }
 
   public List<Function> getFunctions(String catName) throws MetaException {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0ae023bdd7e..de88e482b71 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4399,7 +4399,7 @@ public class ObjectStore implements RawStore, 
Configurable {
     protected abstract String describeResult();
     protected abstract T getSqlResult(GetHelper<T> ctx) throws MetaException;
     protected abstract T getJdoResult(
-        GetHelper<T> ctx) throws MetaException, NoSuchObjectException;
+        GetHelper<T> ctx) throws MetaException, NoSuchObjectException, 
InvalidObjectException;
 
     public T run(boolean initTable) throws MetaException, 
NoSuchObjectException {
       try {
@@ -5261,91 +5261,114 @@ public class ObjectStore implements RawStore, 
Configurable {
                               List<List<String>> part_vals, List<Partition> 
newParts,
                               long writeId, String queryWriteIdList)
                                   throws InvalidObjectException, MetaException 
{
-    boolean success = false;
-    Exception e = null;
     List<Partition> results = new ArrayList<>(newParts.size());
     if (newParts.isEmpty()) {
       return results;
     }
+    catName = normalizeIdentifier(catName);
+    dbName = normalizeIdentifier(dbName);
+    tblName = normalizeIdentifier(tblName);
+
+    boolean success = false;
     try {
       openTransaction();
 
-      MTable table = this.getMTable(catName, dbName, tblName);
-      if (table == null) {
-        throw new NoSuchObjectException(
-            TableName.getQualified(catName, dbName, tblName) + " table not 
found");
+      MTable table = ensureGetMTable(catName, dbName, tblName);
+      // Validate new parts: StorageDescriptor and SerDeInfo must be set in 
Partition.
+      if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) {
+        for (Partition newPart : newParts) {
+          if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) {
+            throw new InvalidObjectException("Partition does not set 
storageDescriptor or serdeInfo.");
+          }
+        }
       }
+      if (writeId > 0) {
+        newParts.forEach(newPart -> newPart.setWriteId(writeId));
+      }
+
+      List<FieldSchema> partCols = 
convertToFieldSchemas(table.getPartitionKeys());
       List<String> partNames = new ArrayList<>();
       for (List<String> partVal : part_vals) {
-        partNames.add(
-            
Warehouse.makePartName(convertToFieldSchemas(table.getPartitionKeys()), partVal)
-        );
+        partNames.add(Warehouse.makePartName(partCols, partVal));
       }
 
-      catName = normalizeIdentifier(catName);
-      dbName = normalizeIdentifier(dbName);
-      tblName = normalizeIdentifier(tblName);
-      List<MPartition> mPartitionList;
-
-      try (Query query = pm.newQuery(MPartition.class,
-              "table.tableName == t1 && table.database.name == t2 && 
t3.contains(partitionName) " +
-                      " && table.database.catalogName == t4")) {
-        query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.util.Collection t3, "
-                + "java.lang.String t4");
-        mPartitionList = (List<MPartition>) query.executeWithArray(tblName, 
dbName, partNames, catName);
-        pm.retrieveAll(mPartitionList);
-
-        if (mPartitionList.size() > newParts.size()) {
-          throw new MetaException("Expecting only one partition but more than 
one partitions are found.");
+      results = new GetListHelper<Partition>(catName, dbName, tblName, true, 
true) {
+        @Override
+        protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx)
+            throws MetaException {
+          return directSql.alterPartitions(table, partNames, newParts, 
queryWriteIdList);
         }
 
-        Map<List<String>, MPartition> mPartsMap = new HashMap();
-        for (MPartition mPartition : mPartitionList) {
-          mPartsMap.put(mPartition.getValues(), mPartition);
+        @Override
+        protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx)
+            throws MetaException, InvalidObjectException {
+          return alterPartitionsViaJdo(table, partNames, newParts, 
queryWriteIdList);
         }
+      }.run(false);
 
-        Set<MColumnDescriptor> oldCds = new HashSet<>();
-        Ref<MColumnDescriptor> oldCdRef = new Ref<>();
-        for (Partition tmpPart : newParts) {
-          if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
-            throw new MetaException("Invalid DB name : " + 
tmpPart.getDbName());
-          }
-
-          if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
-            throw new MetaException("Invalid table   name : " + 
tmpPart.getDbName());
-          }
-
-          if (writeId > 0) {
-            tmpPart.setWriteId(writeId);
-          }
-          oldCdRef.t = null;
-          Partition result = alterPartitionNoTxn(catName, dbName, tblName, 
mPartsMap.get(tmpPart.getValues()),
-                  tmpPart, queryWriteIdList, oldCdRef, table);
-          results.add(result);
-          if (oldCdRef.t != null) {
-            oldCds.add(oldCdRef.t);
-          }
-        }
-        for (MColumnDescriptor oldCd : oldCds) {
-          removeUnusedColumnDescriptor(oldCd);
-        }
-      }
       // commit the changes
       success = commitTransaction();
     } catch (Exception exception) {
-      e = exception;
-      LOG.error("Alter failed", e);
+      LOG.error("Alter failed", exception);
+      throw new MetaException(exception.getMessage());
     } finally {
       if (!success) {
         rollbackTransaction();
-        MetaException metaException = new MetaException(
-            "The transaction for alter partition did not commit 
successfully.");
-        if (e != null) {
-          metaException.initCause(e);
+      }
+    }
+    return results;
+  }
+
+  private List<Partition> alterPartitionsViaJdo(MTable table, List<String> 
partNames,
+                                                List<Partition> newParts, 
String queryWriteIdList)
+      throws MetaException, InvalidObjectException {
+    String catName = table.getDatabase().getCatalogName();
+    String dbName = table.getDatabase().getName();
+    String tblName = table.getTableName();
+    List<Partition> results = new ArrayList<>(newParts.size());
+    List<MPartition> mPartitionList;
+
+    try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartition.class,
+        "table.tableName == t1 && table.database.name == t2 && 
t3.contains(partitionName) " +
+            " && table.database.catalogName == t4"))) {
+      query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.util.Collection t3, "
+          + "java.lang.String t4");
+      mPartitionList = (List<MPartition>) query.executeWithArray(tblName, 
dbName, partNames, catName);
+      pm.retrieveAll(mPartitionList);
+
+      if (mPartitionList.size() > newParts.size()) {
+        throw new MetaException("Expecting only one partition but more than 
one partitions are found.");
+      }
+
+      Map<List<String>, MPartition> mPartsMap = new HashMap();
+      for (MPartition mPartition : mPartitionList) {
+        mPartsMap.put(mPartition.getValues(), mPartition);
+      }
+
+      Set<MColumnDescriptor> oldCds = new HashSet<>();
+      Ref<MColumnDescriptor> oldCdRef = new Ref<>();
+      for (Partition tmpPart : newParts) {
+        if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
+          throw new MetaException("Invalid DB name : " + tmpPart.getDbName());
         }
-        throw metaException;
+
+        if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
+          throw new MetaException("Invalid table   name : " + 
tmpPart.getDbName());
+        }
+
+        oldCdRef.t = null;
+        Partition result = alterPartitionNoTxn(catName, dbName, tblName,
+            mPartsMap.get(tmpPart.getValues()), tmpPart, queryWriteIdList, 
oldCdRef, table);
+        results.add(result);
+        if (oldCdRef.t != null) {
+          oldCds.add(oldCdRef.t);
+        }
+      }
+      for (MColumnDescriptor oldCd : oldCds) {
+        removeUnusedColumnDescriptor(oldCd);
       }
     }
+
     return results;
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index e7e97b5f23d..f490798be56 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -680,4 +680,20 @@ public class TxnUtils {
     return (SQLException)ex;
   }
 
+  public static String createUpdatePreparedStmt(String tableName, List<String> 
columnNames, List<String> conditionKeys) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("update " + tableName + " set ");
+    sb.append(columnNames.stream().map(col -> col + 
"=?").collect(Collectors.joining(",")));
+    sb.append(" where " + conditionKeys.stream().map(cond -> cond + 
"=?").collect(Collectors.joining(" and ")));
+    return sb.toString();
+  }
+
+  public static String createInsertPreparedStmt(String tableName, List<String> 
columnNames) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("insert into " + tableName + "(");
+    sb.append(columnNames.stream().collect(Collectors.joining(",")));
+    String placeholder = columnNames.stream().map(col -> 
"?").collect(Collectors.joining(","));
+    sb.append(") values (" + placeholder + ")");
+    return sb.toString();
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 940b18d1db4..a6e510e5c4d 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -503,7 +503,7 @@ public class TestObjectStore {
       objectStore.alterPartitions(DEFAULT_CATALOG_NAME, DB1, 
"not_existed_table", part_vals, parts, 0, "");
     } catch (MetaException e) {
       // expected
-      Assert.assertTrue(e.getCause() instanceof NoSuchObjectException);
+      Assert.assertEquals(e.getMessage(), "Specified catalog.database.table 
does not exist : hive.testobjectstoredb1.not_existed_table");
     }
   }
 
diff --git 
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
 
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index 90672bf483d..f19576d6940 100644
--- 
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++ 
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static 
org.apache.hadoop.hive.metastore.tools.Constants.HMS_DEFAULT_PORT;
+import static 
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkAlterPartitions;
 import static 
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartition;
 import static 
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartitions;
 import static 
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDeleteCreate;
@@ -311,6 +312,8 @@ public class BenchmarkTool implements Runnable {
               () -> benchmarkCreatePartitions(bench, bData, howMany))
           .add("dropPartitions" + '.' + howMany,
               () -> benchmarkDropPartitions(bench, bData, howMany))
+          .add("alterPartitions" + '.' + howMany,
+              () -> benchmarkAlterPartitions(bench, bData, howMany))
           .add("renameTable" + '.' + howMany,
               () -> benchmarkRenameTable(bench, bData, howMany))
           .add("dropDatabase" + '.' + howMany,
diff --git 
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
 
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index 214e9e1cd6b..a2f97eb3170 100644
--- 
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++ 
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TException;
@@ -338,6 +339,35 @@ final class HMSBenchmarks {
     }
   }
 
+  static DescriptiveStatistics benchmarkAlterPartitions(@NotNull 
MicroBenchmark bench,
+                                                        @NotNull BenchData 
data,
+                                                        int count) {
+    final HMSClient client = data.getClient();
+    String dbName = data.dbName;
+    String tableName = data.tableName;
+
+    BenchmarkUtils.createPartitionedTable(client, dbName, tableName);
+    try {
+      return bench.measure(
+          () -> addManyPartitionsNoException(client, dbName, tableName, null,
+              Collections.singletonList("d"), count),
+          () -> throwingSupplierWrapper(() -> {
+            List<Partition> newPartitions = client.getPartitions(dbName, 
tableName);
+            newPartitions.forEach(p -> {
+              p.getParameters().put("new_param", "param_val");
+              p.getSd().setCols(Arrays.asList(new FieldSchema("new_col", 
"string", null)));
+            });
+            client.alterPartitions(dbName, tableName, newPartitions);
+            return null;
+          }),
+          () -> throwingSupplierWrapper(() ->
+              client.dropPartitions(dbName, tableName, null))
+      );
+    } finally {
+      throwingSupplierWrapper(() -> client.dropTable(dbName, tableName));
+    }
+  }
+
   static DescriptiveStatistics benchmarkGetPartitionNames(@NotNull 
MicroBenchmark bench,
                                                           @NotNull BenchData 
data,
                                                           int count) {


Reply via email to