This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new be89c4600fa HIVE-29030: Alter partition change column cannot use the 
direct sql (#5890)
be89c4600fa is described below

commit be89c4600faaa845e28856c6b03bf07b8e64d926
Author: dengzh <dengzhhu...@gmail.com>
AuthorDate: Tue Jun 24 22:42:54 2025 +0800

    HIVE-29030: Alter partition change column cannot use the direct sql (#5890)
---
 .../hadoop/hive/metastore/DirectSqlInsertPart.java |  30 ++---
 .../hadoop/hive/metastore/DirectSqlUpdatePart.java | 135 +++------------------
 .../hadoop/hive/metastore/MetaStoreDirectSql.java  |   8 +-
 .../hive/metastore/MetastoreDirectSqlUtils.java    |  24 ++++
 .../apache/hadoop/hive/metastore/ObjectStore.java  | 109 ++++++++++-------
 .../hive/metastore/VerifyingObjectStore.java       |  43 +++++++
 6 files changed, 161 insertions(+), 188 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
index 5ac6e082a4e..28dbc04c2da 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang3.StringUtils.repeat;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getModelIdentity;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,11 +42,6 @@
 import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
 import org.apache.hadoop.hive.metastore.model.MStorageDescriptor;
 import org.apache.hadoop.hive.metastore.model.MStringList;
-import org.datanucleus.ExecutionContext;
-import org.datanucleus.api.jdo.JDOPersistenceManager;
-import org.datanucleus.metadata.AbstractClassMetaData;
-import org.datanucleus.metadata.AbstractMemberMetaData;
-import org.datanucleus.metadata.IdentityType;
 
 /**
  * This class contains the methods to insert into tables on the underlying 
database using direct SQL
@@ -68,16 +64,6 @@ interface BatchExecutionContext {
     void execute(String batchQueryText, int batchRowCount) throws 
MetaException;
   }
 
-  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
-    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
-    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
-    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
-      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, null);
-    } else {
-      throw new MetaException("Identity type is not datastore.");
-    }
-  }
-
   private void insertInBatch(String tableName, String columns, int 
columnCount, int rowCount,
       BatchExecutionContext batchExecutionContext) throws MetaException {
     if (rowCount == 0 || columnCount == 0) {
@@ -751,24 +737,24 @@ public void addPartitions(List<MPartition> parts, 
List<List<MPartitionPrivilege>
           || sd.getCD().getCols() == null) {
         throw new MetaException("Invalid partition");
       }
-      Long serDeId = getDataStoreId(MSerDeInfo.class);
+      Long serDeId = getModelIdentity(pm, MSerDeInfo.class);
       serdeIdToSerDeInfo.put(serDeId, sd.getSerDeInfo());
 
       Long cdId;
       LongIdentity storeId = (LongIdentity) pm.getObjectId(sd.getCD());
       if (storeId == null) {
-        cdId = getDataStoreId(MColumnDescriptor.class);
+        cdId = getModelIdentity(pm, MColumnDescriptor.class);
         cdIdToColumnDescriptor.put(cdId, sd.getCD());
       } else {
         cdId = (Long) storeId.getKeyAsObject();
       }
 
-      Long sdId = getDataStoreId(MStorageDescriptor.class);
+      Long sdId = getModelIdentity(pm, MStorageDescriptor.class);
       sdIdToStorageDescriptor.put(sdId, sd);
       sdIdToSerdeId.put(sdId, serDeId);
       sdIdToCdId.put(sdId, cdId);
 
-      Long partId = getDataStoreId(MPartition.class);
+      Long partId = getModelIdentity(pm, MPartition.class);
       partIdToPartition.put(partId, part);
       partIdToSdId.put(partId, sdId);
 
@@ -781,7 +767,7 @@ public void addPartitions(List<MPartition> parts, 
List<List<MPartitionPrivilege>
       if (CollectionUtils.isNotEmpty(sd.getSkewedColValues())) {
         int skewedValCount = sd.getSkewedColValues().size();
         for (int i = 0; i < skewedValCount; i++) {
-          Long stringListId = getDataStoreId(MStringList.class);
+          Long stringListId = getModelIdentity(pm, MStringList.class);
           stringListIds.add(stringListId);
           stringListIdToSdId.put(stringListId, sdId);
           List<String> stringList = 
sd.getSkewedColValues().get(i).getInternalList();
@@ -795,13 +781,13 @@ public void addPartitions(List<MPartition> parts, 
List<List<MPartitionPrivilege>
 
       List<MPartitionPrivilege> partPrivileges = partPrivilegesList.get(index);
       for (MPartitionPrivilege partPrivilege : partPrivileges) {
-        Long partGrantId = getDataStoreId(MPartitionPrivilege.class);
+        Long partGrantId = getModelIdentity(pm, MPartitionPrivilege.class);
         partGrantIdToPrivilege.put(partGrantId, partPrivilege);
         partGrantIdToPartId.put(partGrantId, partId);
       }
       List<MPartitionColumnPrivilege> partColPrivileges = 
partColPrivilegesList.get(index);
       for (MPartitionColumnPrivilege partColPrivilege : partColPrivileges) {
-        Long partColumnGrantId = 
getDataStoreId(MPartitionColumnPrivilege.class);
+        Long partColumnGrantId = getModelIdentity(pm, 
MPartitionColumnPrivilege.class);
         partColumnGrantIdToPrivilege.put(partColumnGrantId, partColPrivilege);
         partColumnGrantIdToPartId.put(partColumnGrantId, partId);
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
index 237f3153bf4..dacc30da84a 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -42,16 +42,11 @@
 import org.apache.hadoop.hive.metastore.model.MStringList;
 import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.datanucleus.ExecutionContext;
-import org.datanucleus.api.jdo.JDOPersistenceManager;
-import org.datanucleus.metadata.AbstractClassMetaData;
-import org.datanucleus.metadata.IdentityType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import javax.jdo.PersistenceManager;
-import javax.jdo.Transaction;
 import javax.jdo.datastore.JDOConnection;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -76,6 +71,7 @@
 import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlClob;
 import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlInt;
 import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlLong;
+import static 
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getModelIdentity;
 
 /**
  * This class contains the optimizations for MetaStore that rely on direct SQL 
access to
@@ -213,7 +209,6 @@ private void verifyUpdates(int[] numUpdates, List<Long> 
partIds) throws MetaExce
   }
 
   private void insertIntoPartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> insertMap,
-                                          long maxCsId,
                                           Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
     int numRows = 0;
     String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", 
\"COLUMN_NAME\", \"COLUMN_TYPE\", \"PART_ID\","
@@ -228,7 +223,7 @@ private void 
insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnSta
         Long partId = partColNameInfo.partitionId;
         MPartitionColumnStatistics mPartitionColumnStatistics = 
(MPartitionColumnStatistics) entry.getValue();
 
-        preparedStatement.setLong(1, maxCsId);
+        preparedStatement.setLong(1, getModelIdentity(pm, 
MPartitionColumnStatistics.class));
         preparedStatement.setString(2, 
mPartitionColumnStatistics.getColName());
         preparedStatement.setString(3, 
mPartitionColumnStatistics.getColType());
         preparedStatement.setLong(4, partId);
@@ -249,7 +244,6 @@ private void 
insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnSta
         preparedStatement.setLong(19, 
mPartitionColumnStatistics.getLastAnalyzed());
         preparedStatement.setString(20, 
mPartitionColumnStatistics.getEngine());
 
-        maxCsId++;
         numRows++;
         preparedStatement.addBatch();
         if (numRows == maxBatchSize) {
@@ -473,15 +467,13 @@ private void setAnsiQuotes(Connection dbConn) throws 
SQLException {
    * @return map of partition key to column stats if successful, null 
otherwise.
    */
   public Map<String, Map<String, String>> 
updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap,
-                                                      Table tbl, long csId,
+                                                      Table tbl,
                                                       String validWriteIds, 
long writeId,
                                                       
List<TransactionalMetaStoreEventListener> transactionalListeners)
           throws MetaException {
 
-    Transaction tx = pm.currentTransaction();
     try {
       dbType.lockInternal();
-      tx.begin();
       JDOConnection jdoConn = null;
       Map<String, Map<String, String>> result;
       try {
@@ -500,11 +492,11 @@ public Map<String, Map<String, String>> 
updatePartitionColumnStatistics(Map<Stri
 
         LOG.info("Number of stats to insert  " + insertMap.size() + " update " 
+ updateMap.size());
 
-        if (insertMap.size() != 0) {
-          insertIntoPartColStatTable(insertMap, csId, dbConn);
+        if (!insertMap.isEmpty()) {
+          insertIntoPartColStatTable(insertMap, dbConn);
         }
 
-        if (updateMap.size() != 0) {
+        if (!updateMap.isEmpty()) {
           updatePartColStatTable(updateMap, dbConn);
         }
 
@@ -524,89 +516,12 @@ public Map<String, Map<String, String>> 
updatePartitionColumnStatistics(Map<Stri
       } finally {
         closeDbConn(jdoConn);
       }
-      tx.commit();
       return result;
     } catch (Exception e) {
       LOG.error("Unable to update Column stats for  " + tbl.getTableName(), e);
       throw new MetaException("Unable to update Column stats for  " + 
tbl.getTableName()
               + " due to: "  + e.getMessage());
     } finally {
-      if (tx.isActive()) {
-        tx.rollback();
-      }
-      dbType.unlockInternal();
-    }
-  }
-
-  /**
-   * Gets the next CS id from sequence MPartitionColumnStatistics and 
increment the CS id by numStats.
-   * @return The CD id before update.
-   */
-  public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws 
MetaException {
-    long maxCsId = 0;
-    Transaction tx = pm.currentTransaction();
-    try {
-      dbType.lockInternal();
-      tx.begin();
-      JDOConnection jdoConn = null;
-      try {
-        jdoConn = pm.getDataStoreConnection();
-        Connection dbConn = (Connection) jdoConn.getNativeConnection();
-
-        setAnsiQuotes(dbConn);
-
-        // This loop will be iterated at max twice. If there is no records, it 
will first insert and then do a select.
-        // We are not using any upsert operations as select for update and 
then update is required to make sure that
-        // the caller gets a reserved range for CSId not used by any other 
thread.
-        boolean insertDone = false;
-        while (maxCsId == 0) {
-          String query = sqlGenerator.addForUpdateClause(
-              "SELECT \"NEXT_VAL\" FROM \"SEQUENCE_TABLE\" " + "WHERE 
\"SEQUENCE_NAME\"= " + quoteString(
-                  
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
-          LOG.debug("Execute query: " + query);
-          try (Statement statement = dbConn.createStatement(); ResultSet rs = 
statement.executeQuery(query)) {
-            if (rs.next()) {
-              maxCsId = rs.getLong(1);
-            } else if (insertDone) {
-              throw new MetaException("Invalid state of SEQUENCE_TABLE for 
MPartitionColumnStatistics");
-            } else {
-              insertDone = true;
-              query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", 
\"NEXT_VAL\")  VALUES ( " + quoteString(
-                  
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1 
+ ")";
-              try {
-                statement.executeUpdate(query);
-              } catch (SQLException e) {
-                // If the record is already inserted by some other thread 
continue to select.
-                if (dbType.isDuplicateKeyError(e)) {
-                  continue;
-                }
-                LOG.error("Unable to insert into SEQUENCE_TABLE for 
MPartitionColumnStatistics.", e);
-                throw e;
-              }
-            }
-          }
-        }
-
-        long nextMaxCsId = maxCsId + numStats + 1;
-        String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = " + 
nextMaxCsId + " WHERE \"SEQUENCE_NAME\" = " + quoteString(
-            
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
-
-        try (Statement statement = dbConn.createStatement()) {
-          statement.executeUpdate(query);
-        }
-      } finally {
-        closeDbConn(jdoConn);
-      }
-      tx.commit();
-      return maxCsId;
-    } catch (Exception e) {
-      LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
-      throw new MetaException("Unable to 
getNextCSIdForMPartitionColumnStatistics  "
-              + " due to: " + e.getMessage());
-    } finally {
-      if (tx.isActive()) {
-        tx.rollback();
-      }
       dbType.unlockInternal();
     }
   }
@@ -1064,7 +979,7 @@ public List<Void> run(List<Long> input) throws 
MetaException {
       List<List<String>> skewedColValues = skewedInfo.getSkewedColValues();
       if (skewedColValues != null) {
         for (List<String> colValues : skewedColValues) {
-          Long nextStringListId = getDataStoreId(MStringList.class);
+          Long nextStringListId = getModelIdentity(pm, MStringList.class);
           newStringListId.add(nextStringListId);
           sdIdToNewStringListId.computeIfAbsent(sdId, k -> new 
ArrayList<>()).add(nextStringListId);
           stringListIdToValues.put(nextStringListId, colValues);
@@ -1075,7 +990,7 @@ public List<Void> run(List<Long> input) throws 
MetaException {
         for (Map.Entry<List<String>, String> entry : 
skewedColValueLocationMaps.entrySet()) {
           List<String> colValues = entry.getKey();
           String location = entry.getValue();
-          Long nextStringListId = getDataStoreId(MStringList.class);
+          Long nextStringListId = getModelIdentity(pm, MStringList.class);
           newStringListId.add(nextStringListId);
           stringListIdToValues.put(nextStringListId, colValues);
           sdIdToValueLoc.computeIfAbsent(sdId, k -> new 
ArrayList<>()).add(Pair.of(nextStringListId, location));
@@ -1090,16 +1005,6 @@ public List<Void> run(List<Long> input) throws 
MetaException {
     insertSkewColValueLocInBatch(sdIdToValueLoc, sdIds);
   }
 
-  private Long getDataStoreId(Class<?> modelClass) throws MetaException {
-    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
-    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
-    if (cmd.getIdentityType() == IdentityType.DATASTORE) {
-      return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, null);
-    } else {
-      throw new MetaException("Identity type is not datastore.");
-    }
-  }
-
   private void insertSkewedColNamesInBatch(Map<Long, List<String>> 
sdIdToSkewedColNames,
                                            List<Long> sdIds) throws 
MetaException {
     List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", 
"\"SKEWED_COL_NAME\"");
@@ -1260,19 +1165,17 @@ public List<Void> run(List<Long> input) throws 
Exception {
 
       List<FieldSchema> newCols = sdIdToNewColumns.get(sdId);
       // Use the new column descriptor only if the old column descriptor 
differs from the new one.
-      if (oldCols == null || !oldCols.equals(newCols)) {
-        if (oldCols != null && newCols != null) {
-          Long newCdId = getDataStoreId(MColumnDescriptor.class);
-          newCdIds.add(newCdId);
-          newCdIdToCols.put(newCdId, newCols);
-          oldCdIdToNewCdId.put(cdId, newCdId);
-          sdIdToNewCdId.put(sdId, newCdId);
-          for (int i = 0; i < oldCols.size(); i++) {
-            FieldSchema oldCol = oldCols.get(i);
-            int newIdx = newCols.indexOf(oldCol);
-            if (newIdx != -1) {
-              oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new 
ArrayList<>()).add(Pair.of(i, newIdx));
-            }
+      if (!oldCols.equals(newCols) && newCols != null) {
+        Long newCdId = getModelIdentity(pm, MColumnDescriptor.class);
+        newCdIds.add(newCdId);
+        newCdIdToCols.put(newCdId, newCols);
+        oldCdIdToNewCdId.put(cdId, newCdId);
+        sdIdToNewCdId.put(sdId, newCdId);
+        for (int i = 0; i < oldCols.size(); i++) {
+          FieldSchema oldCol = oldCols.get(i);
+          int newIdx = newCols.indexOf(oldCol);
+          if (newIdx != -1) {
+            oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new 
ArrayList<>()).add(Pair.of(i, newIdx));
           }
         }
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 0f82e495bf8..12c78c347e4 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -3294,13 +3294,7 @@ public Map<String, Map<String, String>> 
updatePartitionColumnStatisticsBatch(
                                                       
List<TransactionalMetaStoreEventListener> listeners,
                                                       String validWriteIds, 
long writeId)
           throws MetaException {
-    long numStats = 0;
-    for (Map.Entry entry : partColStatsMap.entrySet()) {
-      ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
-      numStats += colStats.getStatsObjSize();
-    }
-    long csId = 
directSqlUpdatePart.getNextCSIdForMPartitionColumnStatistics(numStats);
-    return 
directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, csId, 
validWriteIds, writeId, listeners);
+    return 
directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, 
validWriteIds, writeId, listeners);
   }
 
   public List<Function> getFunctions(String catName) throws MetaException {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
index 500a4ece93b..45e89ab40df 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java
@@ -32,6 +32,10 @@
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.datanucleus.ExecutionContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.AbstractMemberMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -608,6 +612,26 @@ else if (value instanceof byte[]) {
     }
   }
 
+  static Long getModelIdentity(PersistenceManager pm, Class<?> modelClass)
+      throws MetaException {
+    ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+    AbstractClassMetaData cmd = 
ec.getMetaDataManager().getMetaDataForClass(modelClass, 
ec.getClassLoaderResolver());
+    switch (cmd.getIdentityType()) {
+      case DATASTORE :
+        return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, 
cmd, null);
+      case APPLICATION :
+        if (cmd.usesSingleFieldIdentityClass()) {
+          int[] valueGenMemberPositions = 
cmd.getValueGenerationMemberPositions();
+          AbstractMemberMetaData mmd = 
cmd.getMetaDataForManagedMemberAtAbsolutePosition(valueGenMemberPositions[0]);
+          return (Long) 
ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, mmd);
+        }
+        throw new MetaException("Multiple key fields found in class: " + 
modelClass.getSimpleName());
+    default:
+      throw new MetaException(
+          "Identity type is not datastore or application, model: " + 
modelClass.getSimpleName());
+    }
+  }
+
   @FunctionalInterface
   static interface ApplyFunc<Target> {
     void apply(Target t, Object[] fields) throws MetaException;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 5e5b91d642a..c1ca07678fb 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -2384,7 +2384,7 @@ private List<MFieldSchema> 
convertToMFieldSchemas(List<FieldSchema> keys) {
     return mkeys;
   }
 
-  private List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) {
+  protected List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) {
     List<FieldSchema> keys = null;
     if (mkeys != null) {
       keys = new ArrayList<>();
@@ -4454,9 +4454,6 @@ private void start(boolean initTable) throws 
MetaException, NoSuchObjectExceptio
     }
 
     private void handleDirectSqlError(Exception ex, String savePoint) throws 
MetaException, NoSuchObjectException {
-      if (!allowJdo || !DatabaseProduct.isRecoverableException(ex)) {
-        throw ExceptionHandler.newMetaException(ex);
-      }
       String message = null;
       try {
         message = generateShorterMessage(ex);
@@ -4465,6 +4462,11 @@ private void handleDirectSqlError(Exception ex, String 
savePoint) throws MetaExc
       }
       LOG.warn(message); // Don't log the exception, people just get confused.
       LOG.debug("Full DirectSQL callstack for debugging (not an error)", ex);
+
+      if (!allowJdo || !DatabaseProduct.isRecoverableException(ex)) {
+        throw ExceptionHandler.newMetaException(ex);
+      }
+      
       if (!isInTxn) {
         JDOException rollbackEx = null;
         try {
@@ -5270,9 +5272,9 @@ public Partition alterPartition(String catName, String 
dbname, String name, List
 
   @Override
   public List<Partition> alterPartitions(String catName, String dbName, String 
tblName,
-                              List<List<String>> part_vals, List<Partition> 
newParts,
-                              long writeId, String queryWriteIdList)
-                                  throws InvalidObjectException, MetaException 
{
+      List<List<String>> part_vals, List<Partition> newParts,
+      long writeId, String queryWriteIdList)
+      throws InvalidObjectException, MetaException {
     List<Partition> results = new ArrayList<>(newParts.size());
     if (newParts.isEmpty()) {
       return results;
@@ -5284,50 +5286,16 @@ public List<Partition> alterPartitions(String catName, 
String dbName, String tbl
     boolean success = false;
     try {
       openTransaction();
-
       MTable table = ensureGetMTable(catName, dbName, tblName);
-      // Validate new parts: StorageDescriptor and SerDeInfo must be set in 
Partition.
-      if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) {
-        for (Partition newPart : newParts) {
-          if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) {
-            throw new InvalidObjectException("Partition does not set 
storageDescriptor or serdeInfo.");
-          }
-        }
-      }
       if (writeId > 0) {
         newParts.forEach(newPart -> newPart.setWriteId(writeId));
       }
-
       List<FieldSchema> partCols = 
convertToFieldSchemas(table.getPartitionKeys());
       List<String> partNames = new ArrayList<>();
       for (List<String> partVal : part_vals) {
         partNames.add(Warehouse.makePartName(partCols, partVal));
       }
-
-      for (Partition tmpPart : newParts) {
-        if 
(!tmpPart.getDbName().equalsIgnoreCase(table.getDatabase().getName())) {
-          throw new MetaException("Invalid DB name : " + tmpPart.getDbName());
-        }
-
-        if (!tmpPart.getTableName().equalsIgnoreCase(table.getTableName())) {
-          throw new MetaException("Invalid table name : " + 
tmpPart.getDbName());
-        }
-      }
-
-      results = new GetListHelper<Partition>(catName, dbName, tblName, true, 
true) {
-        @Override
-        protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx)
-            throws MetaException {
-          return directSql.alterPartitions(table, partNames, newParts, 
queryWriteIdList);
-        }
-
-        @Override
-        protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx)
-            throws MetaException, InvalidObjectException {
-          return alterPartitionsViaJdo(table, partNames, newParts, 
queryWriteIdList);
-        }
-      }.run(false);
-
+      results = alterPartitionsInternal(table, partNames, newParts, 
queryWriteIdList, true, true);
       // commit the changes
       success = commitTransaction();
     } catch (Exception exception) {
@@ -5339,6 +5307,44 @@ protected List<Partition> 
getJdoResult(GetHelper<List<Partition>> ctx)
     return results;
   }
 
+  protected List<Partition> alterPartitionsInternal(MTable table,
+      List<String> partNames, List<Partition> newParts, String 
queryWriteIdList,
+      boolean allowSql, boolean allowJdo)
+      throws InvalidObjectException, MetaException, NoSuchObjectException {
+    // Validate new parts: StorageDescriptor and SerDeInfo must be set in 
Partition.
+    if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) {
+      for (Partition newPart : newParts) {
+        if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) {
+          throw new InvalidObjectException("Partition does not set 
storageDescriptor or serdeInfo.");
+        }
+      }
+    }
+    String catName = table.getDatabase().getCatalogName();
+    String dbName = table.getDatabase().getName();
+    String tblName = table.getTableName();
+    for (Partition tmpPart : newParts) {
+      if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
+        throw new MetaException("Invalid DB name : " + tmpPart.getDbName());
+      }
+      if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
+        throw new MetaException("Invalid table name : " + tmpPart.getDbName());
+      }
+    }
+    return new GetListHelper<Partition>(catName, dbName, tblName, allowSql, 
allowJdo) {
+      @Override
+      protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx)
+          throws MetaException {
+        return directSql.alterPartitions(table, partNames, newParts, 
queryWriteIdList);
+      }
+
+      @Override
+      protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx)
+          throws MetaException, InvalidObjectException {
+        return alterPartitionsViaJdo(table, partNames, newParts, 
queryWriteIdList);
+      }
+    }.run(false);
+  }
+
   private List<Partition> alterPartitionsViaJdo(MTable table, List<String> 
partNames,
                                                 List<Partition> newParts, 
String queryWriteIdList)
       throws MetaException, InvalidObjectException {
@@ -9672,8 +9678,25 @@ public Map<String, Map<String, String>> 
updatePartitionColumnStatisticsInBatch(
                                                       
List<TransactionalMetaStoreEventListener> listeners,
                                                       String validWriteIds, 
long writeId)
           throws NoSuchObjectException, MetaException, InvalidObjectException, 
InvalidInputException {
-    return directSql.updatePartitionColumnStatisticsBatch(partColStatsMap, tbl,
+
+    return new GetHelper<Map<String, Map<String, String>>>(tbl.getCatName(),
+        tbl.getDbName(), tbl.getTableName(), true, false) {
+      @Override
+      protected String describeResult() {
+        return "Map of partition key to column stats if successful";
+      }
+      @Override
+      protected Map<String, Map<String, String>> 
getSqlResult(GetHelper<Map<String, Map<String, String>>> ctx)
+          throws MetaException {
+        return directSql.updatePartitionColumnStatisticsBatch(partColStatsMap, 
tbl,
             listeners, validWriteIds, writeId);
+      }
+      @Override
+      protected Map<String, Map<String, String>> 
getJdoResult(GetHelper<Map<String, Map<String, String>>> ctx)
+          throws MetaException, NoSuchObjectException, InvalidObjectException, 
InvalidInputException {
+        throw new UnsupportedOperationException("Cannot update partition 
column statistics with JDO, make sure direct SQL is enabled");
+      }
+    }.run(false);
   }
 
   private List<MTableColumnStatistics> getMTableColumnStatistics(Table table, 
List<String> colNames, String engine)
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java
index 8d209679c15..9575bd44b4c 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang3.StringUtils.repeat;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 import java.lang.reflect.AccessibleObject;
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -33,7 +35,10 @@
 
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.client.builder.GetPartitionsArgs;
+import org.apache.hadoop.hive.metastore.model.MTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -113,6 +118,44 @@ public ColumnStatistics getTableColumnStatistics(String 
catName, String dbName,
     return sqlResult;
   }
 
+  @Override
+  public List<Partition> alterPartitions(String catName, String dbName, String 
tblName, List<List<String>> part_vals,
+      List<Partition> newParts, long writeId, String queryWriteIdList) throws 
InvalidObjectException, MetaException {
+    List<Partition> results = new ArrayList<>(newParts.size());
+    catName = normalizeIdentifier(catName);
+    dbName = normalizeIdentifier(dbName);
+    tblName = normalizeIdentifier(tblName);
+    boolean success = false;
+    try {
+      openTransaction();
+      MTable table = ensureGetMTable(catName, dbName, tblName);
+      if (writeId > 0) {
+        newParts.forEach(newPart -> newPart.setWriteId(writeId));
+      }
+      List<FieldSchema> partCols = 
convertToFieldSchemas(table.getPartitionKeys());
+      List<String> partNames = new ArrayList<>();
+      for (List<String> partVal : part_vals) {
+        partNames.add(Warehouse.makePartName(partCols, partVal));
+      }
+      List<Partition> oldParts = getPartitionsByNames(catName, dbName, 
tblName, partNames);
+      if (oldParts.size() != partNames.size()) {
+        throw new MetaException("Some partitions to be altered are missing");
+      }
+      List<Partition> tmpNewParts = new ArrayList<>(newParts);
+      alterPartitionsInternal(table, partNames, newParts, queryWriteIdList, 
true, false);
+      alterPartitionsInternal(table, partNames, oldParts, queryWriteIdList, 
false, true);
+      results = alterPartitionsInternal(table, partNames, tmpNewParts, 
queryWriteIdList, true, false);
+      // commit the changes
+      success = commitTransaction();
+    } catch (Exception exception) {
+      LOG.error("Alter failed", exception);
+      throw new MetaException(exception.getMessage());
+    } finally {
+      rollbackAndCleanup(success, null);
+    }
+    return results;
+  }
+
   @Override
   public List<ColumnStatistics> getPartitionColumnStatistics(String catName, 
String dbName,
       String tableName, List<String> partNames, List<String> colNames, String 
engine)

Reply via email to