This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new be89c4600fa HIVE-29030: Alter partition change column cannot use the direct sql (#5890) be89c4600fa is described below commit be89c4600faaa845e28856c6b03bf07b8e64d926 Author: dengzh <dengzhhu...@gmail.com> AuthorDate: Tue Jun 24 22:42:54 2025 +0800 HIVE-29030: Alter partition change column cannot use the direct sql (#5890) --- .../hadoop/hive/metastore/DirectSqlInsertPart.java | 30 ++--- .../hadoop/hive/metastore/DirectSqlUpdatePart.java | 135 +++------------------ .../hadoop/hive/metastore/MetaStoreDirectSql.java | 8 +- .../hive/metastore/MetastoreDirectSqlUtils.java | 24 ++++ .../apache/hadoop/hive/metastore/ObjectStore.java | 109 ++++++++++------- .../hive/metastore/VerifyingObjectStore.java | 43 +++++++ 6 files changed, 161 insertions(+), 188 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java index 5ac6e082a4e..28dbc04c2da 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import static org.apache.commons.lang3.StringUtils.repeat; +import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getModelIdentity; import java.util.ArrayList; import java.util.Collections; @@ -41,11 +42,6 @@ import org.apache.hadoop.hive.metastore.model.MSerDeInfo; import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; import org.apache.hadoop.hive.metastore.model.MStringList; -import org.datanucleus.ExecutionContext; -import org.datanucleus.api.jdo.JDOPersistenceManager; -import org.datanucleus.metadata.AbstractClassMetaData; -import org.datanucleus.metadata.AbstractMemberMetaData; -import org.datanucleus.metadata.IdentityType; /** * This class contains the methods to insert into tables on the underlying database using direct SQL @@ -68,16 +64,6 @@ interface BatchExecutionContext { void execute(String batchQueryText, int batchRowCount) throws MetaException; } - private Long getDataStoreId(Class<?> modelClass) throws MetaException { - ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); - AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); - if (cmd.getIdentityType() == IdentityType.DATASTORE) { - return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, null); - } else { - throw new MetaException("Identity type is not datastore."); - } - } - private void insertInBatch(String tableName, String columns, int columnCount, int rowCount, BatchExecutionContext batchExecutionContext) throws MetaException { if (rowCount == 0 || columnCount == 0) { @@ -751,24 +737,24 @@ public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege> || sd.getCD().getCols() == null) { throw new MetaException("Invalid partition"); } - Long serDeId = getDataStoreId(MSerDeInfo.class); + Long serDeId = getModelIdentity(pm, MSerDeInfo.class); serdeIdToSerDeInfo.put(serDeId, sd.getSerDeInfo()); Long cdId; LongIdentity storeId = (LongIdentity) pm.getObjectId(sd.getCD()); if (storeId == null) { - cdId = getDataStoreId(MColumnDescriptor.class); + cdId = getModelIdentity(pm, MColumnDescriptor.class); cdIdToColumnDescriptor.put(cdId, sd.getCD()); } else { cdId = (Long) storeId.getKeyAsObject(); } - Long sdId = getDataStoreId(MStorageDescriptor.class); + Long sdId = getModelIdentity(pm, MStorageDescriptor.class); sdIdToStorageDescriptor.put(sdId, sd); sdIdToSerdeId.put(sdId, serDeId); sdIdToCdId.put(sdId, cdId); - Long partId = getDataStoreId(MPartition.class); + Long partId = getModelIdentity(pm, MPartition.class); partIdToPartition.put(partId, part); partIdToSdId.put(partId, sdId); @@ -781,7 +767,7 @@ public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege> if (CollectionUtils.isNotEmpty(sd.getSkewedColValues())) { int skewedValCount = sd.getSkewedColValues().size(); for (int i = 0; i < skewedValCount; i++) { - Long stringListId = getDataStoreId(MStringList.class); + Long stringListId = getModelIdentity(pm, MStringList.class); stringListIds.add(stringListId); stringListIdToSdId.put(stringListId, sdId); List<String> stringList = sd.getSkewedColValues().get(i).getInternalList(); @@ -795,13 +781,13 @@ public void addPartitions(List<MPartition> parts, List<List<MPartitionPrivilege> List<MPartitionPrivilege> partPrivileges = partPrivilegesList.get(index); for (MPartitionPrivilege partPrivilege : partPrivileges) { - Long partGrantId = getDataStoreId(MPartitionPrivilege.class); + Long partGrantId = getModelIdentity(pm, MPartitionPrivilege.class); partGrantIdToPrivilege.put(partGrantId, partPrivilege); partGrantIdToPartId.put(partGrantId, partId); } List<MPartitionColumnPrivilege> partColPrivileges = partColPrivilegesList.get(index); for (MPartitionColumnPrivilege partColPrivilege : partColPrivileges) { - Long partColumnGrantId = getDataStoreId(MPartitionColumnPrivilege.class); + Long partColumnGrantId = getModelIdentity(pm, MPartitionColumnPrivilege.class); partColumnGrantIdToPrivilege.put(partColumnGrantId, partColPrivilege); partColumnGrantIdToPartId.put(partColumnGrantId, partId); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java index 237f3153bf4..dacc30da84a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java @@ -42,16 +42,11 @@ import org.apache.hadoop.hive.metastore.model.MStringList; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.datanucleus.ExecutionContext; -import org.datanucleus.api.jdo.JDOPersistenceManager; -import org.datanucleus.metadata.AbstractClassMetaData; -import org.datanucleus.metadata.IdentityType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import javax.jdo.PersistenceManager; -import javax.jdo.Transaction; import javax.jdo.datastore.JDOConnection; import java.sql.Connection; import java.sql.PreparedStatement; @@ -76,6 +71,7 @@ import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlClob; import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlInt; import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlLong; +import static org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.getModelIdentity; /** * This class contains the optimizations for MetaStore that rely on direct SQL access to @@ -213,7 +209,6 @@ private void verifyUpdates(int[] numUpdates, List<Long> partIds) throws MetaExce } private void insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnStatistics> insertMap, - long maxCsId, Connection dbConn) throws SQLException, MetaException, NoSuchObjectException { int numRows = 0; String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"PART_ID\"," @@ -228,7 +223,7 @@ private void insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnSta Long partId = partColNameInfo.partitionId; MPartitionColumnStatistics mPartitionColumnStatistics = (MPartitionColumnStatistics) entry.getValue(); - preparedStatement.setLong(1, maxCsId); + preparedStatement.setLong(1, getModelIdentity(pm, MPartitionColumnStatistics.class)); preparedStatement.setString(2, mPartitionColumnStatistics.getColName()); preparedStatement.setString(3, mPartitionColumnStatistics.getColType()); preparedStatement.setLong(4, partId); @@ -249,7 +244,6 @@ private void insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnSta preparedStatement.setLong(19, mPartitionColumnStatistics.getLastAnalyzed()); preparedStatement.setString(20, mPartitionColumnStatistics.getEngine()); - maxCsId++; numRows++; preparedStatement.addBatch(); if (numRows == maxBatchSize) { @@ -473,15 +467,13 @@ private void setAnsiQuotes(Connection dbConn) throws SQLException { * @return map of partition key to column stats if successful, null otherwise. */ public Map<String, Map<String, String>> updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap, - Table tbl, long csId, + Table tbl, String validWriteIds, long writeId, List<TransactionalMetaStoreEventListener> transactionalListeners) throws MetaException { - Transaction tx = pm.currentTransaction(); try { dbType.lockInternal(); - tx.begin(); JDOConnection jdoConn = null; Map<String, Map<String, String>> result; try { @@ -500,11 +492,11 @@ public Map<String, Map<String, String>> updatePartitionColumnStatistics(Map<Stri LOG.info("Number of stats to insert " + insertMap.size() + " update " + updateMap.size()); - if (insertMap.size() != 0) { - insertIntoPartColStatTable(insertMap, csId, dbConn); + if (!insertMap.isEmpty()) { + insertIntoPartColStatTable(insertMap, dbConn); } - if (updateMap.size() != 0) { + if (!updateMap.isEmpty()) { updatePartColStatTable(updateMap, dbConn); } @@ -524,89 +516,12 @@ public Map<String, Map<String, String>> updatePartitionColumnStatistics(Map<Stri } finally { closeDbConn(jdoConn); } - tx.commit(); return result; } catch (Exception e) { LOG.error("Unable to update Column stats for " + tbl.getTableName(), e); throw new MetaException("Unable to update Column stats for " + tbl.getTableName() + " due to: " + e.getMessage()); } finally { - if (tx.isActive()) { - tx.rollback(); - } - dbType.unlockInternal(); - } - } - - /** - * Gets the next CS id from sequence MPartitionColumnStatistics and increment the CS id by numStats. - * @return The CD id before update. - */ - public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws MetaException { - long maxCsId = 0; - Transaction tx = pm.currentTransaction(); - try { - dbType.lockInternal(); - tx.begin(); - JDOConnection jdoConn = null; - try { - jdoConn = pm.getDataStoreConnection(); - Connection dbConn = (Connection) jdoConn.getNativeConnection(); - - setAnsiQuotes(dbConn); - - // This loop will be iterated at max twice. If there is no records, it will first insert and then do a select. - // We are not using any upsert operations as select for update and then update is required to make sure that - // the caller gets a reserved range for CSId not used by any other thread. - boolean insertDone = false; - while (maxCsId == 0) { - String query = sqlGenerator.addForUpdateClause( - "SELECT \"NEXT_VAL\" FROM \"SEQUENCE_TABLE\" " + "WHERE \"SEQUENCE_NAME\"= " + quoteString( - "org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")); - LOG.debug("Execute query: " + query); - try (Statement statement = dbConn.createStatement(); ResultSet rs = statement.executeQuery(query)) { - if (rs.next()) { - maxCsId = rs.getLong(1); - } else if (insertDone) { - throw new MetaException("Invalid state of SEQUENCE_TABLE for MPartitionColumnStatistics"); - } else { - insertDone = true; - query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") VALUES ( " + quoteString( - "org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1 + ")"; - try { - statement.executeUpdate(query); - } catch (SQLException e) { - // If the record is already inserted by some other thread continue to select. - if (dbType.isDuplicateKeyError(e)) { - continue; - } - LOG.error("Unable to insert into SEQUENCE_TABLE for MPartitionColumnStatistics.", e); - throw e; - } - } - } - } - - long nextMaxCsId = maxCsId + numStats + 1; - String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = " + nextMaxCsId + " WHERE \"SEQUENCE_NAME\" = " + quoteString( - "org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"); - - try (Statement statement = dbConn.createStatement()) { - statement.executeUpdate(query); - } - } finally { - closeDbConn(jdoConn); - } - tx.commit(); - return maxCsId; - } catch (Exception e) { - LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e); - throw new MetaException("Unable to getNextCSIdForMPartitionColumnStatistics " - + " due to: " + e.getMessage()); - } finally { - if (tx.isActive()) { - tx.rollback(); - } dbType.unlockInternal(); } } @@ -1064,7 +979,7 @@ public List<Void> run(List<Long> input) throws MetaException { List<List<String>> skewedColValues = skewedInfo.getSkewedColValues(); if (skewedColValues != null) { for (List<String> colValues : skewedColValues) { - Long nextStringListId = getDataStoreId(MStringList.class); + Long nextStringListId = getModelIdentity(pm, MStringList.class); newStringListId.add(nextStringListId); sdIdToNewStringListId.computeIfAbsent(sdId, k -> new ArrayList<>()).add(nextStringListId); stringListIdToValues.put(nextStringListId, colValues); @@ -1075,7 +990,7 @@ public List<Void> run(List<Long> input) throws MetaException { for (Map.Entry<List<String>, String> entry : skewedColValueLocationMaps.entrySet()) { List<String> colValues = entry.getKey(); String location = entry.getValue(); - Long nextStringListId = getDataStoreId(MStringList.class); + Long nextStringListId = getModelIdentity(pm, MStringList.class); newStringListId.add(nextStringListId); stringListIdToValues.put(nextStringListId, colValues); sdIdToValueLoc.computeIfAbsent(sdId, k -> new ArrayList<>()).add(Pair.of(nextStringListId, location)); @@ -1090,16 +1005,6 @@ public List<Void> run(List<Long> input) throws MetaException { insertSkewColValueLocInBatch(sdIdToValueLoc, sdIds); } - private Long getDataStoreId(Class<?> modelClass) throws MetaException { - ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); - AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); - if (cmd.getIdentityType() == IdentityType.DATASTORE) { - return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, null); - } else { - throw new MetaException("Identity type is not datastore."); - } - } - private void insertSkewedColNamesInBatch(Map<Long, List<String>> sdIdToSkewedColNames, List<Long> sdIds) throws MetaException { List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"", "\"SKEWED_COL_NAME\""); @@ -1260,19 +1165,17 @@ public List<Void> run(List<Long> input) throws Exception { List<FieldSchema> newCols = sdIdToNewColumns.get(sdId); // Use the new column descriptor only if the old column descriptor differs from the new one. - if (oldCols == null || !oldCols.equals(newCols)) { - if (oldCols != null && newCols != null) { - Long newCdId = getDataStoreId(MColumnDescriptor.class); - newCdIds.add(newCdId); - newCdIdToCols.put(newCdId, newCols); - oldCdIdToNewCdId.put(cdId, newCdId); - sdIdToNewCdId.put(sdId, newCdId); - for (int i = 0; i < oldCols.size(); i++) { - FieldSchema oldCol = oldCols.get(i); - int newIdx = newCols.indexOf(oldCol); - if (newIdx != -1) { - oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new ArrayList<>()).add(Pair.of(i, newIdx)); - } + if (!oldCols.equals(newCols) && newCols != null) { + Long newCdId = getModelIdentity(pm, MColumnDescriptor.class); + newCdIds.add(newCdId); + newCdIdToCols.put(newCdId, newCols); + oldCdIdToNewCdId.put(cdId, newCdId); + sdIdToNewCdId.put(sdId, newCdId); + for (int i = 0; i < oldCols.size(); i++) { + FieldSchema oldCol = oldCols.get(i); + int newIdx = newCols.indexOf(oldCol); + if (newIdx != -1) { + oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new ArrayList<>()).add(Pair.of(i, newIdx)); } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 0f82e495bf8..12c78c347e4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -3294,13 +3294,7 @@ public Map<String, Map<String, String>> updatePartitionColumnStatisticsBatch( List<TransactionalMetaStoreEventListener> listeners, String validWriteIds, long writeId) throws MetaException { - long numStats = 0; - for (Map.Entry entry : partColStatsMap.entrySet()) { - ColumnStatistics colStats = (ColumnStatistics) entry.getValue(); - numStats += colStats.getStatsObjSize(); - } - long csId = directSqlUpdatePart.getNextCSIdForMPartitionColumnStatistics(numStats); - return directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, csId, validWriteIds, writeId, listeners); + return directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, validWriteIds, writeId, listeners); } public List<Function> getFunctions(String catName) throws MetaException { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java index 500a4ece93b..45e89ab40df 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java @@ -32,6 +32,10 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.datanucleus.ExecutionContext; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.metadata.AbstractClassMetaData; +import org.datanucleus.metadata.AbstractMemberMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -608,6 +612,26 @@ else if (value instanceof byte[]) { } } + static Long getModelIdentity(PersistenceManager pm, Class<?> modelClass) + throws MetaException { + ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext(); + AbstractClassMetaData cmd = ec.getMetaDataManager().getMetaDataForClass(modelClass, ec.getClassLoaderResolver()); + switch (cmd.getIdentityType()) { + case DATASTORE : + return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, null); + case APPLICATION : + if (cmd.usesSingleFieldIdentityClass()) { + int[] valueGenMemberPositions = cmd.getValueGenerationMemberPositions(); + AbstractMemberMetaData mmd = cmd.getMetaDataForManagedMemberAtAbsolutePosition(valueGenMemberPositions[0]); + return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec, cmd, mmd); + } + throw new MetaException("Multiple key fields found in class: " + modelClass.getSimpleName()); + default: + throw new MetaException( + "Identity type is not datastore or application, model: " + modelClass.getSimpleName()); + } + } + @FunctionalInterface static interface ApplyFunc<Target> { void apply(Target t, Object[] fields) throws MetaException; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 5e5b91d642a..c1ca07678fb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2384,7 +2384,7 @@ private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) { return mkeys; } - private List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) { + protected List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) { List<FieldSchema> keys = null; if (mkeys != null) { keys = new ArrayList<>(); @@ -4454,9 +4454,6 @@ private void start(boolean initTable) throws MetaException, NoSuchObjectExceptio } private void handleDirectSqlError(Exception ex, String savePoint) throws MetaException, NoSuchObjectException { - if (!allowJdo || !DatabaseProduct.isRecoverableException(ex)) { - throw ExceptionHandler.newMetaException(ex); - } String message = null; try { message = generateShorterMessage(ex); @@ -4465,6 +4462,11 @@ private void handleDirectSqlError(Exception ex, String savePoint) throws MetaExc } LOG.warn(message); // Don't log the exception, people just get confused. LOG.debug("Full DirectSQL callstack for debugging (not an error)", ex); + + if (!allowJdo || !DatabaseProduct.isRecoverableException(ex)) { + throw ExceptionHandler.newMetaException(ex); + } + if (!isInTxn) { JDOException rollbackEx = null; try { @@ -5270,9 +5272,9 @@ public Partition alterPartition(String catName, String dbname, String name, List @Override public List<Partition> alterPartitions(String catName, String dbName, String tblName, - List<List<String>> part_vals, List<Partition> newParts, - long writeId, String queryWriteIdList) - throws InvalidObjectException, MetaException { + List<List<String>> part_vals, List<Partition> newParts, + long writeId, String queryWriteIdList) + throws InvalidObjectException, MetaException { List<Partition> results = new ArrayList<>(newParts.size()); if (newParts.isEmpty()) { return results; @@ -5284,50 +5286,16 @@ public List<Partition> alterPartitions(String catName, String dbName, String tbl boolean success = false; try { openTransaction(); - MTable table = ensureGetMTable(catName, dbName, tblName); - // Validate new parts: StorageDescriptor and SerDeInfo must be set in Partition. - if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) { - for (Partition newPart : newParts) { - if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) { - throw new InvalidObjectException("Partition does not set storageDescriptor or serdeInfo."); - } - } - } if (writeId > 0) { newParts.forEach(newPart -> newPart.setWriteId(writeId)); } - List<FieldSchema> partCols = convertToFieldSchemas(table.getPartitionKeys()); List<String> partNames = new ArrayList<>(); for (List<String> partVal : part_vals) { partNames.add(Warehouse.makePartName(partCols, partVal)); } - - for (Partition tmpPart : newParts) { - if (!tmpPart.getDbName().equalsIgnoreCase(table.getDatabase().getName())) { - throw new MetaException("Invalid DB name : " + tmpPart.getDbName()); - } - - if (!tmpPart.getTableName().equalsIgnoreCase(table.getTableName())) { - throw new MetaException("Invalid table name : " + tmpPart.getDbName()); - } - } - - results = new GetListHelper<Partition>(catName, dbName, tblName, true, true) { - @Override - protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx) - throws MetaException { - return directSql.alterPartitions(table, partNames, newParts, queryWriteIdList); - } - - @Override - protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx) - throws MetaException, InvalidObjectException { - return alterPartitionsViaJdo(table, partNames, newParts, queryWriteIdList); - } - }.run(false); - + results = alterPartitionsInternal(table, partNames, newParts, queryWriteIdList, true, true); // commit the changes success = commitTransaction(); } catch (Exception exception) { @@ -5339,6 +5307,44 @@ protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx) return results; } + protected List<Partition> alterPartitionsInternal(MTable table, + List<String> partNames, List<Partition> newParts, String queryWriteIdList, + boolean allowSql, boolean allowJdo) + throws InvalidObjectException, MetaException, NoSuchObjectException { + // Validate new parts: StorageDescriptor and SerDeInfo must be set in Partition. + if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) { + for (Partition newPart : newParts) { + if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) { + throw new InvalidObjectException("Partition does not set storageDescriptor or serdeInfo."); + } + } + } + String catName = table.getDatabase().getCatalogName(); + String dbName = table.getDatabase().getName(); + String tblName = table.getTableName(); + for (Partition tmpPart : newParts) { + if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) { + throw new MetaException("Invalid DB name : " + tmpPart.getDbName()); + } + if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) { + throw new MetaException("Invalid table name : " + tmpPart.getDbName()); + } + } + return new GetListHelper<Partition>(catName, dbName, tblName, allowSql, allowJdo) { + @Override + protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx) + throws MetaException { + return directSql.alterPartitions(table, partNames, newParts, queryWriteIdList); + } + + @Override + protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx) + throws MetaException, InvalidObjectException { + return alterPartitionsViaJdo(table, partNames, newParts, queryWriteIdList); + } + }.run(false); + } + private List<Partition> alterPartitionsViaJdo(MTable table, List<String> partNames, List<Partition> newParts, String queryWriteIdList) throws MetaException, InvalidObjectException { @@ -9672,8 +9678,25 @@ public Map<String, Map<String, String>> updatePartitionColumnStatisticsInBatch( List<TransactionalMetaStoreEventListener> listeners, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - return directSql.updatePartitionColumnStatisticsBatch(partColStatsMap, tbl, + + return new GetHelper<Map<String, Map<String, String>>>(tbl.getCatName(), + tbl.getDbName(), tbl.getTableName(), true, false) { + @Override + protected String describeResult() { + return "Map of partition key to column stats if successful"; + } + @Override + protected Map<String, Map<String, String>> getSqlResult(GetHelper<Map<String, Map<String, String>>> ctx) + throws MetaException { + return directSql.updatePartitionColumnStatisticsBatch(partColStatsMap, tbl, listeners, validWriteIds, writeId); + } + @Override + protected Map<String, Map<String, String>> getJdoResult(GetHelper<Map<String, Map<String, String>>> ctx) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + throw new UnsupportedOperationException("Cannot update partition column statistics with JDO, make sure direct SQL is enabled"); + } + }.run(false); } private List<MTableColumnStatistics> getMTableColumnStatistics(Table table, List<String> colNames, String engine) diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java index 8d209679c15..9575bd44b4c 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/VerifyingObjectStore.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.metastore; import static org.apache.commons.lang3.StringUtils.repeat; +import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; import java.lang.reflect.AccessibleObject; import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -33,7 +35,10 @@ import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.client.builder.GetPartitionsArgs; +import org.apache.hadoop.hive.metastore.model.MTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -113,6 +118,44 @@ public ColumnStatistics getTableColumnStatistics(String catName, String dbName, return sqlResult; } + @Override + public List<Partition> alterPartitions(String catName, String dbName, String tblName, List<List<String>> part_vals, + List<Partition> newParts, long writeId, String queryWriteIdList) throws InvalidObjectException, MetaException { + List<Partition> results = new ArrayList<>(newParts.size()); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + boolean success = false; + try { + openTransaction(); + MTable table = ensureGetMTable(catName, dbName, tblName); + if (writeId > 0) { + newParts.forEach(newPart -> newPart.setWriteId(writeId)); + } + List<FieldSchema> partCols = convertToFieldSchemas(table.getPartitionKeys()); + List<String> partNames = new ArrayList<>(); + for (List<String> partVal : part_vals) { + partNames.add(Warehouse.makePartName(partCols, partVal)); + } + List<Partition> oldParts = getPartitionsByNames(catName, dbName, tblName, partNames); + if (oldParts.size() != partNames.size()) { + throw new MetaException("Some partitions to be altered are missing"); + } + List<Partition> tmpNewParts = new ArrayList<>(newParts); + alterPartitionsInternal(table, partNames, newParts, queryWriteIdList, true, false); + alterPartitionsInternal(table, partNames, oldParts, queryWriteIdList, false, true); + results = alterPartitionsInternal(table, partNames, tmpNewParts, queryWriteIdList, true, false); + // commit the changes + success = commitTransaction(); + } catch (Exception exception) { + LOG.error("Alter failed", exception); + throw new MetaException(exception.getMessage()); + } finally { + rollbackAndCleanup(success, null); + } + return results; + } + @Override public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tableName, List<String> partNames, List<String> colNames, String engine)