This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 202906022b4 HIVE-27530: Implement direct SQL for alter partitions to
improve performance (Wechar Yu, reviewed by Denys Kuzmenko, Sai Hemanth
Gantasala)
202906022b4 is described below
commit 202906022b450ab2e90928f514f6cad39830ad1e
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Dec 25 19:24:04 2023 +0800
HIVE-27530: Implement direct SQL for alter partitions to improve
performance (Wechar Yu, reviewed by Denys Kuzmenko, Sai Hemanth Gantasala)
Closes #4517
---
.../apache/hadoop/hive/metastore/Batchable.java | 30 +-
.../hadoop/hive/metastore/DatabaseProduct.java | 16 +
.../hadoop/hive/metastore/DirectSqlInsertPart.java | 8 +-
.../hadoop/hive/metastore/DirectSqlUpdatePart.java | 1536 ++++++++++++++++++++
.../hadoop/hive/metastore/DirectSqlUpdateStat.java | 727 ---------
.../apache/hadoop/hive/metastore/HMSHandler.java | 18 +-
.../hadoop/hive/metastore/MetaStoreDirectSql.java | 127 +-
.../apache/hadoop/hive/metastore/ObjectStore.java | 145 +-
.../apache/hadoop/hive/metastore/txn/TxnUtils.java | 16 +
.../hadoop/hive/metastore/TestObjectStore.java | 2 +-
.../hadoop/hive/metastore/tools/BenchmarkTool.java | 3 +
.../hadoop/hive/metastore/tools/HMSBenchmarks.java | 30 +
12 files changed, 1817 insertions(+), 841 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
index e3fd5a4bf14..571d6bdbd1d 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Batchable.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.metastore;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.jdo.Query;
@@ -38,7 +39,7 @@ public abstract class Batchable<I, R> {
public static final int NO_BATCHING = -1;
private List<Query> queries = null;
- public abstract List<R> run(List<I> input) throws MetaException;
+ public abstract List<R> run(List<I> input) throws Exception;
public void addQueryAfterUse(Query query) {
if (queries == null) {
@@ -70,18 +71,25 @@ public abstract class Batchable<I, R> {
final int batchSize,
List<I> input,
Batchable<I, R> runnable) throws MetaException {
- if (batchSize == NO_BATCHING || batchSize >= input.size()) {
- return runnable.run(input);
+ if (input == null || input.isEmpty()) {
+ return Collections.emptyList();
}
- List<R> result = new ArrayList<R>(input.size());
- for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex =
toIndex) {
- toIndex = Math.min(fromIndex + batchSize, input.size());
- List<I> batchedInput = input.subList(fromIndex, toIndex);
- List<R> batchedOutput = runnable.run(batchedInput);
- if (batchedOutput != null) {
- result.addAll(batchedOutput);
+ try {
+ if (batchSize == NO_BATCHING || batchSize >= input.size()) {
+ return runnable.run(input);
}
+ List<R> result = new ArrayList<>(input.size());
+ for (int fromIndex = 0, toIndex = 0; toIndex < input.size(); fromIndex =
toIndex) {
+ toIndex = Math.min(fromIndex + batchSize, input.size());
+ List<I> batchedInput = input.subList(fromIndex, toIndex);
+ List<R> batchedOutput = runnable.run(batchedInput);
+ if (batchedOutput != null) {
+ result.addAll(batchedOutput);
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ throw ExceptionHandler.newMetaException(e);
}
- return result;
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 6e04bf0d6f7..ea3faf09113 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configurable;
@@ -748,6 +749,21 @@ public class DatabaseProduct implements Configurable {
return val;
}
+ /**
+ * Get the max rows in a query with paramSize.
+ * @param batch the configured batch size
+ * @param paramSize the parameter size in a query statement
+ * @return the max allowed rows in a query
+ */
+ public int getMaxRows(int batch, int paramSize) {
+ if (isSQLSERVER()) {
+ // SQL Server supports a maximum of 2100 parameters in a request. Adjust
the maxRowsInBatch accordingly
+ int maxAllowedRows = (2100 - paramSize) / paramSize;
+ return Math.min(batch, maxAllowedRows);
+ }
+ return batch;
+ }
+
// This class implements the Configurable interface for the benefit
// of "plugin" instances created via reflection (see invocation of
// ReflectionUtils.newInstance in method determineDatabaseProduct)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
index be17470edd6..ba205ebe705 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlInsertPart.java
@@ -83,13 +83,7 @@ class DirectSqlInsertPart {
return;
}
int maxRowsInBatch = batchSize > 0 ? batchSize : rowCount;
- if (dbType.isSQLSERVER()) {
- // SQL Server supports a maximum of 2100 parameters in a request. Adjust
the maxRowsInBatch accordingly
- int maxAllowedRows = (2100 - columnCount) / columnCount;
- if (maxRowsInBatch > maxAllowedRows) {
- maxRowsInBatch = maxAllowedRows;
- }
- }
+ maxRowsInBatch = dbType.getMaxRows(maxRowsInBatch, columnCount);
int maxBatches = rowCount / maxRowsInBatch;
int last = rowCount % maxRowsInBatch;
String rowFormat = "(" + repeat(",?", columnCount).substring(1) + ")";
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
new file mode 100644
index 00000000000..f6e41f09094
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -0,0 +1,1536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import
org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEventBatch;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
+import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
+import org.apache.hadoop.hive.metastore.model.MStringList;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.datanucleus.ExecutionContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.metadata.AbstractClassMetaData;
+import org.datanucleus.metadata.IdentityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.datastore.JDOConnection;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.executeWithArray;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlInt;
+import static
org.apache.hadoop.hive.metastore.MetastoreDirectSqlUtils.extractSqlLong;
+
+/**
+ * This class contains the optimizations for MetaStore that rely on direct SQL
access to
+ * the underlying database. It should use ANSI SQL and be compatible with
common databases
+ * such as MySQL (note that MySQL doesn't use full ANSI mode by default),
Postgres, etc.
+ *
+ * This class separates out the update part from MetaStoreDirectSql class.
+ */
+class DirectSqlUpdatePart {
+ private static final Logger LOG =
LoggerFactory.getLogger(DirectSqlUpdatePart.class.getName());
+
+ private final PersistenceManager pm;
+ private final Configuration conf;
+ private final DatabaseProduct dbType;
+ private final int maxBatchSize;
+ private final SQLGenerator sqlGenerator;
+
+ private static final ReentrantLock derbyLock = new ReentrantLock(true);
+
+ public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
+ DatabaseProduct dbType, int batchSize) {
+ this.pm = pm;
+ this.conf = conf;
+ this.dbType = dbType;
+ this.maxBatchSize = batchSize;
+ sqlGenerator = new SQLGenerator(dbType, conf);
+ }
+
+ /**
+ * {@link #lockInternal()} and {@link #unlockInternal()} are used to
serialize those operations that require
+ * Select ... For Update to sequence operations properly. In practice that
means when running
+ * with Derby database. See more notes at class level.
+ */
+ private void lockInternal() {
+ if(dbType.isDERBY()) {
+ derbyLock.lock();
+ }
+ }
+
+ private void unlockInternal() {
+ if(dbType.isDERBY()) {
+ derbyLock.unlock();
+ }
+ }
+
+ void rollbackDBConn(Connection dbConn) {
+ try {
+ if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.warn("Failed to rollback db connection ", e);
+ }
+ }
+
+ void closeDbConn(JDOConnection jdoConn) {
+ try {
+ if (jdoConn != null) {
+ jdoConn.close();
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to close db connection", e);
+ }
+ }
+
+ void closeStmt(Statement stmt) {
+ try {
+ if (stmt != null && !stmt.isClosed()) stmt.close();
+ } catch (SQLException e) {
+ LOG.warn("Failed to close statement ", e);
+ }
+ }
+
+ void close(ResultSet rs) {
+ try {
+ if (rs != null && !rs.isClosed()) {
+ rs.close();
+ }
+ }
+ catch(SQLException ex) {
+ LOG.warn("Failed to close statement ", ex);
+ }
+ }
+
+ static String quoteString(String input) {
+ return "'" + input + "'";
+ }
+
+ void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
+ close(rs);
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+
+ private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics>
statsPartInfoMap,
+ Map<PartColNameInfo,
MPartitionColumnStatistics> updateMap,
+ Map<PartColNameInfo,
MPartitionColumnStatistics>insertMap,
+ Connection dbConn, Table tbl) throws
SQLException, MetaException, NoSuchObjectException {
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+ Statement statement = null;
+ ResultSet rs = null;
+ List<String> queries = new ArrayList<>();
+ Set<PartColNameInfo> selectedParts = new HashSet<>();
+
+ List<Long> partIdList = statsPartInfoMap.keySet().stream().map(
+ e -> e.partitionId).collect(Collectors.toList()
+ );
+
+ prefix.append("select \"PART_ID\", \"COLUMN_NAME\" from \"PART_COL_STATS\"
WHERE ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ partIdList, "\"PART_ID\"", true, false);
+
+ for (String query : queries) {
+ try {
+ statement = dbConn.createStatement();
+ LOG.debug("Going to execute query " + query);
+ rs = statement.executeQuery(query);
+ while (rs.next()) {
+ selectedParts.add(new PartColNameInfo(rs.getLong(1),
rs.getString(2)));
+ }
+ } finally {
+ close(rs, statement, null);
+ }
+ }
+
+ for (Map.Entry entry : statsPartInfoMap.entrySet()) {
+ PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
+ ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
+ long partId = partitionInfo.partitionId;
+ ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+ if (!statsDesc.isSetCatName()) {
+ statsDesc.setCatName(tbl.getCatName());
+ }
+ for (ColumnStatisticsObj statisticsObj : colStats.getStatsObj()) {
+ PartColNameInfo temp = new PartColNameInfo(partId,
statisticsObj.getColName());
+ if (selectedParts.contains(temp)) {
+ updateMap.put(temp, StatObjectConverter.
+ convertToMPartitionColumnStatistics(null, statsDesc,
statisticsObj, colStats.getEngine()));
+ } else {
+ insertMap.put(temp, StatObjectConverter.
+ convertToMPartitionColumnStatistics(null, statsDesc,
statisticsObj, colStats.getEngine()));
+ }
+ }
+ }
+ }
+
+ private void updatePartColStatTable(Map<PartColNameInfo,
MPartitionColumnStatistics> updateMap,
+ Connection dbConn) throws
SQLException, MetaException, NoSuchObjectException {
+ PreparedStatement pst = null;
+ for (Map.Entry entry : updateMap.entrySet()) {
+ PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
+ Long partId = partColNameInfo.partitionId;
+ MPartitionColumnStatistics mPartitionColumnStatistics =
(MPartitionColumnStatistics) entry.getValue();
+ String update = "UPDATE \"PART_COL_STATS\" SET ";
+ update +=
StatObjectConverter.getUpdatedColumnSql(mPartitionColumnStatistics);
+ update += " WHERE \"PART_ID\" = " + partId + " AND "
+ + " \"COLUMN_NAME\" = " +
quoteString(mPartitionColumnStatistics.getColName());
+ try {
+ pst = dbConn.prepareStatement(update);
+
StatObjectConverter.initUpdatedColumnStatement(mPartitionColumnStatistics, pst);
+ LOG.debug("Going to execute update " + update);
+ int numUpdate = pst.executeUpdate();
+ if (numUpdate != 1) {
+ throw new MetaException("Invalid state of PART_COL_STATS for
PART_ID " + partId);
+ }
+ } finally {
+ closeStmt(pst);
+ }
+ }
+ }
+
+ private void insertIntoPartColStatTable(Map<PartColNameInfo,
MPartitionColumnStatistics> insertMap,
+ long maxCsId,
+ Connection dbConn) throws
SQLException, MetaException, NoSuchObjectException {
+ PreparedStatement preparedStatement = null;
+ int numRows = 0;
+ String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\",
\"DB_NAME\","
+ + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\",
\"COLUMN_TYPE\", \"PART_ID\","
+ + " \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\",
\"DOUBLE_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\","
+ + " \"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\",
\"NUM_NULLS\", \"NUM_DISTINCTS\", \"BIT_VECTOR\" ,"
+ + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\",
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
+ + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?)";
+
+ try {
+ preparedStatement = dbConn.prepareStatement(insert);
+ for (Map.Entry entry : insertMap.entrySet()) {
+ PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
+ Long partId = partColNameInfo.partitionId;
+ MPartitionColumnStatistics mPartitionColumnStatistics =
(MPartitionColumnStatistics) entry.getValue();
+
+ preparedStatement.setLong(1, maxCsId);
+ preparedStatement.setString(2,
mPartitionColumnStatistics.getCatName());
+ preparedStatement.setString(3, mPartitionColumnStatistics.getDbName());
+ preparedStatement.setString(4,
mPartitionColumnStatistics.getTableName());
+ preparedStatement.setString(5,
mPartitionColumnStatistics.getPartitionName());
+ preparedStatement.setString(6,
mPartitionColumnStatistics.getColName());
+ preparedStatement.setString(7,
mPartitionColumnStatistics.getColType());
+ preparedStatement.setLong(8, partId);
+ preparedStatement.setObject(9,
mPartitionColumnStatistics.getLongLowValue());
+ preparedStatement.setObject(10,
mPartitionColumnStatistics.getLongHighValue());
+ preparedStatement.setObject(11,
mPartitionColumnStatistics.getDoubleHighValue());
+ preparedStatement.setObject(12,
mPartitionColumnStatistics.getDoubleLowValue());
+ preparedStatement.setString(13,
mPartitionColumnStatistics.getDecimalLowValue());
+ preparedStatement.setString(14,
mPartitionColumnStatistics.getDecimalHighValue());
+ preparedStatement.setObject(15,
mPartitionColumnStatistics.getNumNulls());
+ preparedStatement.setObject(16,
mPartitionColumnStatistics.getNumDVs());
+ preparedStatement.setObject(17,
mPartitionColumnStatistics.getBitVector());
+ preparedStatement.setBytes(18,
mPartitionColumnStatistics.getHistogram());
+ preparedStatement.setObject(19,
mPartitionColumnStatistics.getAvgColLen());
+ preparedStatement.setObject(20,
mPartitionColumnStatistics.getMaxColLen());
+ preparedStatement.setObject(21,
mPartitionColumnStatistics.getNumTrues());
+ preparedStatement.setObject(22,
mPartitionColumnStatistics.getNumFalses());
+ preparedStatement.setLong(23,
mPartitionColumnStatistics.getLastAnalyzed());
+ preparedStatement.setString(24,
mPartitionColumnStatistics.getEngine());
+
+ maxCsId++;
+ numRows++;
+ preparedStatement.addBatch();
+ if (numRows == maxBatchSize) {
+ preparedStatement.executeBatch();
+ numRows = 0;
+ }
+ }
+
+ if (numRows != 0) {
+ preparedStatement.executeBatch();
+ }
+ } finally {
+ closeStmt(preparedStatement);
+ }
+ }
+
+ private Map<Long, String> getParamValues(Connection dbConn, List<Long>
partIdList) throws SQLException {
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+ Statement statement = null;
+ ResultSet rs = null;
+
+ prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
+ + " from \"PARTITION_PARAMS\" where "
+ + " \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE' "
+ + " and ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ partIdList, "\"PART_ID\"", true, false);
+
+ Map<Long, String> partIdToParaMap = new HashMap<>();
+ for (String query : queries) {
+ try {
+ statement = dbConn.createStatement();
+ LOG.debug("Going to execute query " + query);
+ rs = statement.executeQuery(query);
+ while (rs.next()) {
+ partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+ }
+ } finally {
+ close(rs, statement, null);
+ }
+ }
+ return partIdToParaMap;
+ }
+
+ private void updateWriteIdForPartitions(Connection dbConn, long writeId,
List<Long> partIdList) throws SQLException {
+ StringBuilder prefix = new StringBuilder();
+ List<String> queries = new ArrayList<>();
+ StringBuilder suffix = new StringBuilder();
+
+ prefix.append("UPDATE \"PARTITIONS\" set \"WRITE_ID\" = " + writeId + "
where ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ partIdList, "\"PART_ID\"", false, false);
+
+ Statement statement = null;
+ for (String query : queries) {
+ try {
+ statement = dbConn.createStatement();
+ LOG.debug("Going to execute update " + query);
+ statement.executeUpdate(query);
+ } finally {
+ closeStmt(statement);
+ }
+ }
+ }
+
+ private Map<String, Map<String, String>>
updatePartitionParamTable(Connection dbConn,
+
Map<PartitionInfo, ColumnStatistics> partitionInfoMap,
+ String
validWriteIds,
+ long
writeId,
+ boolean
isAcidTable)
+ throws SQLException, MetaException {
+ Map<String, Map<String, String>> result = new HashMap<>();
+ boolean areTxnStatsSupported = MetastoreConf.getBoolVar(conf,
ConfVars.HIVE_TXN_STATS_ENABLED);
+ String insert = "INSERT INTO \"PARTITION_PARAMS\" (\"PART_ID\",
\"PARAM_KEY\", \"PARAM_VALUE\") "
+ + "VALUES( ? , 'COLUMN_STATS_ACCURATE' , ? )";
+ String delete = "DELETE from \"PARTITION_PARAMS\" "
+ + " where \"PART_ID\" = ? "
+ + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
+ String update = "UPDATE \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? "
+ + " where \"PART_ID\" = ? "
+ + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
+ int numInsert = 0;
+ int numDelete = 0;
+ int numUpdate = 0;
+
+ List<Long> partIdList = partitionInfoMap.keySet().stream().map(
+ e -> e.partitionId).collect(Collectors.toList()
+ );
+
+ // get the old parameters from PARTITION_PARAMS table.
+ Map<Long, String> partIdToParaMap = getParamValues(dbConn, partIdList);
+
+ try (PreparedStatement statementInsert = dbConn.prepareStatement(insert);
+ PreparedStatement statementDelete = dbConn.prepareStatement(delete);
+ PreparedStatement statementUpdate = dbConn.prepareStatement(update)) {
+ for (Map.Entry entry : partitionInfoMap.entrySet()) {
+ PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
+ ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
+ List<String> colNames = colStats.getStatsObj().stream().map(e ->
e.getColName()).collect(Collectors.toList());
+ long partWriteId = partitionInfo.writeId;
+ long partId = partitionInfo.partitionId;
+ Map<String, String> newParameter;
+
+ if (!partIdToParaMap.containsKey(partId)) {
+ newParameter = new HashMap<>();
+ newParameter.put(COLUMN_STATS_ACCURATE, "TRUE");
+ StatsSetupConst.setColumnStatsState(newParameter, colNames);
+ statementInsert.setLong(1, partId);
+ statementInsert.setString(2,
newParameter.get(COLUMN_STATS_ACCURATE));
+ numInsert++;
+ statementInsert.addBatch();
+ if (numInsert == maxBatchSize) {
+ LOG.debug(" Executing insert " + insert);
+ statementInsert.executeBatch();
+ numInsert = 0;
+ }
+ } else {
+ String oldStats = partIdToParaMap.get(partId);
+
+ Map<String, String> oldParameter = new HashMap<>();
+ oldParameter.put(COLUMN_STATS_ACCURATE, oldStats);
+
+ newParameter = new HashMap<>();
+ newParameter.put(COLUMN_STATS_ACCURATE, oldStats);
+ StatsSetupConst.setColumnStatsState(newParameter, colNames);
+
+ if (isAcidTable) {
+ String errorMsg = ObjectStore.verifyStatsChangeCtx(
+ colStats.getStatsDesc().getDbName() + "." +
colStats.getStatsDesc().getTableName(),
+ oldParameter, newParameter, writeId, validWriteIds, true);
+ if (errorMsg != null) {
+ throw new MetaException(errorMsg);
+ }
+ }
+
+ if (isAcidTable &&
+ (!areTxnStatsSupported ||
!ObjectStore.isCurrentStatsValidForTheQuery(oldParameter, partWriteId,
+ validWriteIds, true))) {
+ statementDelete.setLong(1, partId);
+ statementDelete.addBatch();
+ numDelete++;
+ if (numDelete == maxBatchSize) {
+ statementDelete.executeBatch();
+ numDelete = 0;
+ LOG.debug("Removed COLUMN_STATS_ACCURATE from the parameters of
the partition "
+ + colStats.getStatsDesc().getDbName() + "." +
colStats.getStatsDesc().getTableName() + "."
+ + colStats.getStatsDesc().getPartName());
+ }
+ } else {
+ statementUpdate.setString(1,
newParameter.get(COLUMN_STATS_ACCURATE));
+ statementUpdate.setLong(2, partId);
+ statementUpdate.addBatch();
+ numUpdate++;
+ if (numUpdate == maxBatchSize) {
+ LOG.debug(" Executing update " + statementUpdate);
+ statementUpdate.executeBatch();
+ numUpdate = 0;
+ }
+ }
+ }
+ result.put(partitionInfo.partitionName, newParameter);
+ }
+
+ if (numInsert != 0) {
+ statementInsert.executeBatch();
+ }
+
+ if (numUpdate != 0) {
+ statementUpdate.executeBatch();
+ }
+
+ if (numDelete != 0) {
+ statementDelete.executeBatch();
+ }
+
+ if (isAcidTable) {
+ updateWriteIdForPartitions(dbConn, writeId, partIdList);
+ }
+ return result;
+ }
+ }
+
+
+ private Map<PartitionInfo, ColumnStatistics> getPartitionInfo(Connection
dbConn, long tblId,
+ Map<String,
ColumnStatistics> partColStatsMap)
+ throws SQLException, MetaException {
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+ Statement statement = null;
+ ResultSet rs = null;
+ Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
+
+ List<String> partKeys = partColStatsMap.keySet().stream().map(
+ e -> quoteString(e)).collect(Collectors.toList()
+ );
+
+ prefix.append("select \"PART_ID\", \"WRITE_ID\", \"PART_NAME\" from
\"PARTITIONS\" where ");
+ suffix.append(" and \"TBL_ID\" = " + tblId);
+ TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+ partKeys, "\"PART_NAME\"", true, false);
+
+ for (String query : queries) {
+ // Select for update makes sure that the partitions are not modified
while the stats are getting updated.
+ query = sqlGenerator.addForUpdateClause(query);
+ try {
+ statement = dbConn.createStatement();
+ LOG.debug("Going to execute query <" + query + ">");
+ rs = statement.executeQuery(query);
+ while (rs.next()) {
+ PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
+ rs.getLong(2), rs.getString(3));
+ partitionInfoMap.put(partitionInfo,
partColStatsMap.get(rs.getString(3)));
+ }
+ } finally {
+ close(rs, statement, null);
+ }
+ }
+ return partitionInfoMap;
+ }
+
+ private void setAnsiQuotes(Connection dbConn) throws SQLException {
+ if (sqlGenerator.getDbProduct().isMYSQL()) {
+ try (Statement stmt = dbConn.createStatement()) {
+ stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
+ }
+ }
+ }
+
+ /**
+ * Update the statistics for the given partitions. Add the notification logs
also.
+ * @return map of partition key to column stats if successful, null
otherwise.
+ */
+ public Map<String, Map<String, String>>
updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap,
+ Table tbl, long csId,
+ String validWriteIds,
long writeId,
+
List<TransactionalMetaStoreEventListener> transactionalListeners)
+ throws MetaException {
+ JDOConnection jdoConn = null;
+ Connection dbConn = null;
+ boolean committed = false;
+ try {
+ lockInternal();
+ jdoConn = pm.getDataStoreConnection();
+ dbConn = (Connection) (jdoConn.getNativeConnection());
+
+ setAnsiQuotes(dbConn);
+
+ Map<PartitionInfo, ColumnStatistics> partitionInfoMap =
getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);
+
+ Map<String, Map<String, String>> result =
+ updatePartitionParamTable(dbConn, partitionInfoMap,
validWriteIds, writeId, TxnUtils.isAcidTable(tbl));
+
+ Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new
HashMap<>();
+ Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new
HashMap<>();
+ populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn,
tbl);
+
+ LOG.info("Number of stats to insert " + insertMap.size() + " update " +
updateMap.size());
+
+ if (insertMap.size() != 0) {
+ insertIntoPartColStatTable(insertMap, csId, dbConn);
+ }
+
+ if (updateMap.size() != 0) {
+ updatePartColStatTable(updateMap, dbConn);
+ }
+
+ if (transactionalListeners != null) {
+ UpdatePartitionColumnStatEventBatch eventBatch = new
UpdatePartitionColumnStatEventBatch(null);
+ for (Map.Entry entry : result.entrySet()) {
+ Map<String, String> parameters = (Map<String, String>)
entry.getValue();
+ ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
+ List<String> partVals = getPartValsFromName(tbl,
colStats.getStatsDesc().getPartName());
+ UpdatePartitionColumnStatEvent event = new
UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
+ tbl, writeId, null);
+ eventBatch.addPartColStatEvent(event);
+ }
+
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+ EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH,
eventBatch, dbConn, sqlGenerator);
+ }
+ dbConn.commit();
+ committed = true;
+ return result;
+ } catch (Exception e) {
+ LOG.error("Unable to update Column stats for " + tbl.getTableName(), e);
+ throw new MetaException("Unable to update Column stats for " +
tbl.getTableName()
+ + " due to: " + e.getMessage());
+ } finally {
+ if (!committed) {
+ rollbackDBConn(dbConn);
+ }
+ closeDbConn(jdoConn);
+ unlockInternal();
+ }
+ }
+
+ /**
+ * Gets the next CS id from sequence MPartitionColumnStatistics and
increment the CS id by numStats.
+ * @return The CD id before update.
+ */
+ public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws
MetaException {
+ Statement statement = null;
+ ResultSet rs = null;
+ long maxCsId = 0;
+ boolean committed = false;
+ Connection dbConn = null;
+ JDOConnection jdoConn = null;
+
+ try {
+ lockInternal();
+ jdoConn = pm.getDataStoreConnection();
+ dbConn = (Connection) (jdoConn.getNativeConnection());
+
+ setAnsiQuotes(dbConn);
+
+ // This loop will be iterated at max twice. If there is no records, it
will first insert and then do a select.
+ // We are not using any upsert operations as select for update and then
update is required to make sure that
+ // the caller gets a reserved range for CSId not used by any other
thread.
+ boolean insertDone = false;
+ while (maxCsId == 0) {
+ String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\"
FROM \"SEQUENCE_TABLE\" "
+ + "WHERE \"SEQUENCE_NAME\"= "
+ +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
+ LOG.debug("Going to execute query " + query);
+ statement = dbConn.createStatement();
+ rs = statement.executeQuery(query);
+ if (rs.next()) {
+ maxCsId = rs.getLong(1);
+ } else if (insertDone) {
+ throw new MetaException("Invalid state of SEQUENCE_TABLE for
MPartitionColumnStatistics");
+ } else {
+ insertDone = true;
+ closeStmt(statement);
+ statement = dbConn.createStatement();
+ query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\",
\"NEXT_VAL\") VALUES ( "
+ +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
+ "," + 1
+ + ")";
+ try {
+ statement.executeUpdate(query);
+ } catch (SQLException e) {
+ // If the record is already inserted by some other thread continue
to select.
+ if (dbType.isDuplicateKeyError(e)) {
+ continue;
+ }
+ LOG.error("Unable to insert into SEQUENCE_TABLE for
MPartitionColumnStatistics.", e);
+ throw e;
+ } finally {
+ closeStmt(statement);
+ }
+ }
+ }
+
+ long nextMaxCsId = maxCsId + numStats + 1;
+ closeStmt(statement);
+ statement = dbConn.createStatement();
+ String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
+ + nextMaxCsId
+ + " WHERE \"SEQUENCE_NAME\" = "
+ +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
+ statement.executeUpdate(query);
+ dbConn.commit();
+ committed = true;
+ return maxCsId;
+ } catch (Exception e) {
+ LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
+ throw new MetaException("Unable to
getNextCSIdForMPartitionColumnStatistics "
+ + " due to: " + e.getMessage());
+ } finally {
+ if (!committed) {
+ rollbackDBConn(dbConn);
+ }
+ close(rs, statement, jdoConn);
+ unlockInternal();
+ }
+ }
+
+ public void alterPartitions(Map<List<String>, Long> partValuesToId,
Map<Long, Long> partIdToSdId,
+ List<Partition> newParts) throws MetaException {
+ List<Long> partIds = new ArrayList<>(newParts.size());
+ Map<Long, Optional<Map<String, String>>> partParamsOpt = new HashMap<>();
+ Map<Long, StorageDescriptor> idToSd = new HashMap<>();
+ for (Partition newPart : newParts) {
+ Long partId = partValuesToId.get(newPart.getValues());
+ Long sdId = partIdToSdId.get(partId);
+ partIds.add(partId);
+ partParamsOpt.put(partId, Optional.ofNullable(newPart.getParameters()));
+ idToSd.put(sdId, newPart.getSd());
+ }
+
+ // alter partitions does not change partition values,
+ // so only PARTITIONS and PARTITION_PARAMS need to update.
+ updatePartitionsInBatch(partValuesToId, newParts);
+ updateParamTableInBatch("\"PARTITION_PARAMS\"", "\"PART_ID\"", partIds,
partParamsOpt);
+ updateStorageDescriptorInBatch(idToSd);
+ }
+
+ private interface ThrowableConsumer<T> {
+ void accept(T t) throws SQLException, MetaException;
+ }
+
+ private <T> List<Long> filterIdsByNonNullValue(List<Long> ids, Map<Long, T>
map) {
+ return ids.stream().filter(id -> map.get(id) !=
null).collect(Collectors.toList());
+ }
+
+ private void updateWithStatement(ThrowableConsumer<PreparedStatement>
consumer, String query)
+ throws MetaException {
+ JDOConnection jdoConn = pm.getDataStoreConnection();
+ boolean doTrace = LOG.isDebugEnabled();
+ long start = doTrace ? System.nanoTime() : 0;
+ try (PreparedStatement statement =
+ ((Connection)
jdoConn.getNativeConnection()).prepareStatement(query)) {
+ consumer.accept(statement);
+ MetastoreDirectSqlUtils.timingTrace(doTrace, query, start, doTrace ?
System.nanoTime() : 0);
+ } catch (SQLException e) {
+ LOG.error("Failed to execute update query: " + query, e);
+ throw new MetaException("Unable to execute update due to: " +
e.getMessage());
+ } finally {
+ closeDbConn(jdoConn);
+ }
+ }
+
+ private void updatePartitionsInBatch(Map<List<String>, Long> partValuesToId,
+ List<Partition> newParts) throws
MetaException {
+ List<String> columns = Arrays.asList("\"CREATE_TIME\"",
"\"LAST_ACCESS_TIME\"", "\"WRITE_ID\"");
+ List<String> conditionKeys = Arrays.asList("\"PART_ID\"");
+ String stmt = TxnUtils.createUpdatePreparedStmt("\"PARTITIONS\"", columns,
conditionKeys);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows, newParts,
new Batchable<Partition, Void>() {
+ @Override
+ public List<Void> run(List<Partition> input) throws SQLException {
+ for (Partition p : input) {
+ statement.setLong(1, p.getCreateTime());
+ statement.setLong(2, p.getLastAccessTime());
+ statement.setLong(3, p.getWriteId());
+ statement.setLong(4, partValuesToId.get(p.getValues()));
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), stmt);
+ }
+
+ /* Get stringListId from both SKEWED_VALUES and SKEWED_COL_VALUE_LOC_MAP
tables. */
+ private List<Long> getStringListId(List<Long> sdIds) throws MetaException {
+ return Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long,
Long>() {
+ @Override
+ public List<Long> run(List<Long> input) throws Exception {
+ List<Long> result = new ArrayList<>();
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryFromSkewedValues = "select \"STRING_LIST_ID_EID\" " +
+ "from \"SKEWED_VALUES\" where \"SD_ID_OID\" in (" + idLists + ")";
+ try (QueryWrapper query =
+ new QueryWrapper(pm.newQuery("javax.jdo.query.SQL",
queryFromSkewedValues))) {
+ List<Long> sqlResult = executeWithArray(query.getInnerQuery(), null,
queryFromSkewedValues);
+ result.addAll(sqlResult);
+ }
+ String queryFromValueLoc = "select \"STRING_LIST_ID_KID\" " +
+ "from \"SKEWED_COL_VALUE_LOC_MAP\" where \"SD_ID\" in (" + idLists
+ ")";
+ try (QueryWrapper query =
+ new QueryWrapper(pm.newQuery("javax.jdo.query.SQL",
queryFromValueLoc))) {
+ List<Long> sqlResult = executeWithArray(query.getInnerQuery(), null,
queryFromValueLoc);
+ result.addAll(sqlResult);
+ }
+ return result;
+ }
+ });
+ }
+
+ private void updateParamTableInBatch(String paramTable, String idColumn,
List<Long> ids,
+ Map<Long, Optional<Map<String,
String>>> newParamsOpt) throws MetaException {
+ Map<Long, Map<String, String>> oldParams = getParams(paramTable, idColumn,
ids);
+
+ List<Pair<Long, String>> paramsToDelete = new ArrayList<>();
+ List<Pair<Long, Pair<String, String>>> paramsToUpdate = new ArrayList<>();
+ List<Pair<Long, Pair<String, String>>> paramsToAdd = new ArrayList<>();
+
+ for (Long id : ids) {
+ Map<String, String> oldParam = oldParams.getOrDefault(id, new
HashMap<>());
+ Map<String, String> newParam =
newParamsOpt.get(id).orElseGet(HashMap::new);
+ for (Map.Entry<String, String> entry : oldParam.entrySet()) {
+ String key = entry.getKey();
+ String oldValue = entry.getValue();
+ if (!newParam.containsKey(key)) {
+ paramsToDelete.add(Pair.of(id, key));
+ } else if (!oldValue.equals(newParam.get(key))) {
+ paramsToUpdate.add(Pair.of(id, Pair.of(key, newParam.get(key))));
+ }
+ }
+ List<Pair<Long, Pair<String, String>>> newlyParams =
newParam.entrySet().stream()
+ .filter(entry -> !oldParam.containsKey(entry.getKey()))
+ .map(entry -> Pair.of(id, Pair.of(entry.getKey(), entry.getValue())))
+ .collect(Collectors.toList());
+ paramsToAdd.addAll(newlyParams);
+ }
+
+ deleteParams(paramTable, idColumn, paramsToDelete);
+ updateParams(paramTable, idColumn, paramsToUpdate);
+ insertParams(paramTable, idColumn, paramsToAdd);
+ }
+
+ private Map<Long, Map<String, String>> getParams(String paramTable, String
idName,
+ List<Long> ids) throws
MetaException {
+ Map<Long, Map<String, String>> idToParams = new HashMap<>();
+ Batchable.runBatched(maxBatchSize, ids, new Batchable<Long, Object>() {
+ @Override
+ public List<Object> run(List<Long> input) throws MetaException {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "select " + idName + ", \"PARAM_KEY\",
\"PARAM_VALUE\" from " +
+ paramTable + " where " + idName + " in (" + idLists + ")";
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(),
null, queryText);
+ for (Object[] row : sqlResult) {
+ Long id = extractSqlLong(row[0]);
+ String paramKey = (String) row[1];
+ String paramVal = (String) row[2];
+ idToParams.computeIfAbsent(id, key -> new
HashMap<>()).put(paramKey, paramVal);
+ }
+ }
+ return null;
+ }
+ });
+ return idToParams;
+ }
+
+ private void deleteParams(String paramTable, String idColumn,
+ List<Pair<Long, String>> deleteIdKeys) throws
MetaException {
+ String deleteStmt = "delete from " + paramTable + " where " + idColumn +
"=? and PARAM_KEY=?";
+ int maxRows = dbType.getMaxRows(maxBatchSize, 2);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
deleteIdKeys,
+ new Batchable<Pair<Long, String>, Void>() {
+ @Override
+ public List<Void> run(List<Pair<Long, String>> input) throws
SQLException {
+ for (Pair<Long, String> pair : input) {
+ statement.setLong(1, pair.getLeft());
+ statement.setString(2, pair.getRight());
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), deleteStmt);
+ }
+
+ private void updateParams(String paramTable, String idColumn,
+ List<Pair<Long, Pair<String, String>>>
updateIdAndParams) throws MetaException {
+ List<String> columns = Arrays.asList("\"PARAM_VALUE\"");
+ List<String> conditionKeys = Arrays.asList(idColumn, "\"PARAM_KEY\"");
+ String stmt = TxnUtils.createUpdatePreparedStmt(paramTable, columns,
conditionKeys);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
updateIdAndParams,
+ new Batchable<Pair<Long, Pair<String, String>>, Object>() {
+ @Override
+ public List<Object> run(List<Pair<Long, Pair<String, String>>>
input) throws SQLException {
+ for (Pair<Long, Pair<String, String>> pair : input) {
+ statement.setString(1, pair.getRight().getRight());
+ statement.setLong(2, pair.getLeft());
+ statement.setString(3, pair.getRight().getLeft());
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), stmt);
+ }
+
+ private void insertParams(String paramTable, String idColumn,
+ List<Pair<Long, Pair<String, String>>>
addIdAndParams) throws MetaException {
+ List<String> columns = Arrays.asList(idColumn, "\"PARAM_KEY\"",
"\"PARAM_VALUE\"");
+ String query = TxnUtils.createInsertPreparedStmt(paramTable, columns);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
addIdAndParams,
+ new Batchable<Pair<Long, Pair<String, String>>, Void>() {
+ @Override
+ public List<Void> run(List<Pair<Long, Pair<String, String>>> input)
throws SQLException {
+ for (Pair<Long, Pair<String, String>> pair : input) {
+ statement.setLong(1, pair.getLeft());
+ statement.setString(2, pair.getRight().getLeft());
+ statement.setString(3, pair.getRight().getRight());
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), query);
+ }
+
+ private void updateStorageDescriptorInBatch(Map<Long, StorageDescriptor>
idToSd)
+ throws MetaException {
+ Map<Long, Long> sdIdToCdId = new HashMap<>();
+ Map<Long, Long> sdIdToSerdeId = new HashMap<>();
+ List<Long> cdIds = new ArrayList<>();
+ List<Long> validSdIds = filterIdsByNonNullValue(new
ArrayList<>(idToSd.keySet()), idToSd);
+ Batchable.runBatched(maxBatchSize, validSdIds, new Batchable<Long, Void>()
{
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "select \"SD_ID\", \"CD_ID\", \"SERDE_ID\" from
\"SDS\" " +
+ "where \"SD_ID\" in (" + idLists + ")";
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(),
null, queryText);
+ for (Object[] row : sqlResult) {
+ Long sdId = extractSqlLong(row[0]);
+ Long cdId = extractSqlLong(row[1]);
+ Long serdeId = extractSqlLong(row[2]);
+ sdIdToCdId.put(sdId, cdId);
+ sdIdToSerdeId.put(sdId, serdeId);
+ cdIds.add(cdId);
+ }
+ }
+ return null;
+ }
+ });
+
+ Map<Long, Optional<Map<String, String>>> sdParamsOpt = new HashMap<>();
+ Map<Long, List<String>> idToBucketCols = new HashMap<>();
+ Map<Long, List<Order>> idToSortCols = new HashMap<>();
+ Map<Long, SkewedInfo> idToSkewedInfo = new HashMap<>();
+ Map<Long, List<FieldSchema>> sdIdToNewColumns = new HashMap<>();
+ List<Long> serdeIds = new ArrayList<>();
+ Map<Long, SerDeInfo> serdeIdToSerde = new HashMap<>();
+ Map<Long, Optional<Map<String, String>>> serdeParamsOpt = new HashMap<>();
+ for (Long sdId : validSdIds) {
+ StorageDescriptor sd = idToSd.get(sdId);
+ sdParamsOpt.put(sdId, Optional.ofNullable(sd.getParameters()));
+ idToBucketCols.put(sdId, sd.getBucketCols());
+ idToSortCols.put(sdId, sd.getSortCols());
+ idToSkewedInfo.put(sdId, sd.getSkewedInfo());
+ sdIdToNewColumns.put(sdId, sd.getCols());
+
+ Long serdeId = sdIdToSerdeId.get(sdId);
+ serdeIds.add(serdeId);
+ serdeIdToSerde.put(serdeId, sd.getSerdeInfo());
+ serdeParamsOpt.put(serdeId,
Optional.ofNullable(sd.getSerdeInfo().getParameters()));
+ }
+
+ updateParamTableInBatch("\"SD_PARAMS\"", "\"SD_ID\"", validSdIds,
sdParamsOpt);
+ updateBucketColsInBatch(idToBucketCols, validSdIds);
+ updateSortColsInBatch(idToSortCols, validSdIds);
+ updateSkewedInfoInBatch(idToSkewedInfo, validSdIds);
+ Map<Long, Long> sdIdToNewCdId = updateCDInBatch(cdIds, validSdIds,
sdIdToCdId, sdIdToNewColumns);
+ updateSerdeInBatch(serdeIds, serdeIdToSerde);
+ updateParamTableInBatch("\"SERDE_PARAMS\"", "\"SERDE_ID\"", serdeIds,
serdeParamsOpt);
+
+ List<Long> cdIdsMayDelete = sdIdToCdId.entrySet().stream()
+ .filter(entry -> sdIdToNewCdId.containsKey(entry.getKey()))
+ .map(entry -> entry.getValue())
+ .collect(Collectors.toList());
+
+ // Update SDS table after CDS to get the freshest CD_ID values.
+ sdIdToCdId.replaceAll((sdId, cdId) ->
+ sdIdToNewCdId.containsKey(sdId) ? sdIdToNewCdId.get(sdId) : cdId);
+ updateSDInBatch(validSdIds, idToSd, sdIdToCdId);
+
+ List<Long> usedIds = Batchable.runBatched(maxBatchSize, cdIdsMayDelete,
+ new Batchable<Long, Long>() {
+ @Override
+ public List<Long> run(List<Long> input) throws Exception {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "select \"CD_ID\" from \"SDS\" where \"CD_ID\"
in ( " + idLists + ")";
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ List<Long> sqlResult = executeWithArray(query.getInnerQuery(),
null, queryText);
+ return new ArrayList<>(sqlResult);
+ }
+ }
+ });
+ List<Long> unusedCdIds = cdIdsMayDelete.stream().filter(id ->
!usedIds.contains(id)).collect(Collectors.toList());
+
+ deleteCDInBatch(unusedCdIds);
+ }
+
+ private void updateSDInBatch(List<Long> ids, Map<Long, StorageDescriptor>
idToSd,
+ Map<Long, Long> idToCdId) throws MetaException {
+ List<String> columns = Arrays.asList("\"CD_ID\"", "\"INPUT_FORMAT\"",
"\"IS_COMPRESSED\"",
+ "\"IS_STOREDASSUBDIRECTORIES\"", "\"LOCATION\"", "\"NUM_BUCKETS\"",
"\"OUTPUT_FORMAT\"");
+ List<String> conditionKeys = Arrays.asList("\"SD_ID\"");
+ String stmt = TxnUtils.createUpdatePreparedStmt("\"SDS\"", columns,
conditionKeys);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 8);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long sdId : input) {
+ StorageDescriptor sd = idToSd.get(sdId);
+ statement.setLong(1, idToCdId.get(sdId));
+ statement.setString(2, sd.getInputFormat());
+ statement.setBoolean(3, sd.isCompressed());
+ statement.setBoolean(4, sd.isStoredAsSubDirectories());
+ statement.setString(5, sd.getLocation());
+ statement.setInt(6, sd.getNumBuckets());
+ statement.setString(7, sd.getOutputFormat());
+ statement.setLong(8, sdId);
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), stmt);
+ }
+
+ private void updateBucketColsInBatch(Map<Long, List<String>>
sdIdToBucketCols,
+ List<Long> sdIds) throws MetaException {
+ Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws MetaException {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "delete from \"BUCKETING_COLS\" where \"SD_ID\" in
(" + idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate, queryText);
+ return null;
+ }
+ });
+ List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"",
"\"BUCKET_COL_NAME\"");
+ String stmt = TxnUtils.createInsertPreparedStmt("\"BUCKETING_COLS\"",
columns);
+ List<Long> idWithBucketCols = filterIdsByNonNullValue(sdIds,
sdIdToBucketCols);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithBucketCols, new Batchable<Long, Object>() {
+ @Override
+ public List<Object> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ List<String> bucketCols = sdIdToBucketCols.get(id);
+ for (int i = 0; i < bucketCols.size(); i++) {
+ statement.setLong(1, id);
+ statement.setInt(2, i);
+ statement.setString(3, bucketCols.get(i));
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), stmt);
+ }
+
+ private void updateSortColsInBatch(Map<Long, List<Order>> sdIdToSortCols,
+ List<Long> sdIds) throws MetaException {
+ Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws MetaException {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "delete from \"SORT_COLS\" where \"SD_ID\" in (" +
idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate, queryText);
+ return null;
+ }
+ });
+
+ List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"",
"\"COLUMN_NAME\"", "\"ORDER\"");
+ String stmt = TxnUtils.createInsertPreparedStmt("\"SORT_COLS\"", columns);
+ List<Long> idWithSortCols = filterIdsByNonNullValue(sdIds, sdIdToSortCols);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithSortCols, new Batchable<Long, Object>() {
+ @Override
+ public List<Object> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ List<Order> bucketCols = sdIdToSortCols.get(id);
+ for (int i = 0; i < bucketCols.size(); i++) {
+ statement.setLong(1, id);
+ statement.setInt(2, i);
+ statement.setString(3, bucketCols.get(i).getCol());
+ statement.setInt(4, bucketCols.get(i).getOrder());
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), stmt);
+ }
+
+ private void updateSkewedInfoInBatch(Map<Long, SkewedInfo> sdIdToSkewedInfo,
+ List<Long> sdIds) throws MetaException {
+ // Delete all mapping old stringLists and skewedValues,
+ // skewedValues first for the foreign key constraint.
+ List<Long> stringListId = getStringListId(sdIds);
+ if (!stringListId.isEmpty()) {
+ Batchable.runBatched(maxBatchSize, sdIds, new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String deleteSkewValuesQuery =
+ "delete from \"SKEWED_VALUES\" where \"SD_ID_OID\" in (" +
idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteSkewValuesQuery);
+ String deleteSkewColValueLocMapQuery =
+ "delete from \"SKEWED_COL_VALUE_LOC_MAP\" where \"SD_ID\" in ("
+ idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteSkewColValueLocMapQuery);
+ String deleteSkewColNamesQuery =
+ "delete from \"SKEWED_COL_NAMES\" where \"SD_ID\" in (" +
idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteSkewColNamesQuery);
+ return null;
+ }
+ });
+ Batchable.runBatched(maxBatchSize, stringListId, new Batchable<Long,
Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws MetaException {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String deleteStringListValuesQuery =
+ "delete from \"SKEWED_STRING_LIST_VALUES\" where
\"STRING_LIST_ID\" in (" + idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteStringListValuesQuery);
+ String deleteStringListQuery =
+ "delete from \"SKEWED_STRING_LIST\" where \"STRING_LIST_ID\" in
(" + idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteStringListQuery);
+ return null;
+ }
+ });
+ }
+
+ // Generate new stringListId for each SdId
+ Map<Long, List<String>> idToSkewedColNames = new HashMap<>(); //
used for SKEWED_COL_NAMES
+ List<Long> newStringListId = new ArrayList<>(); //
used for SKEWED_STRING_LIST
+ Map<Long, List<String>> stringListIdToValues = new HashMap<>(); //
used for SKEWED_STRING_LIST_VALUES
+ Map<Long, List<Long>> sdIdToNewStringListId = new HashMap<>(); //
used for SKEWED_VALUES
+ Map<Long, List<Pair<Long, String>>> sdIdToValueLoc = new HashMap<>(); //
used for SKEWED_COL_VALUE_LOC_MAP
+
+ List<Long> idWithSkewedInfo = filterIdsByNonNullValue(sdIds,
sdIdToSkewedInfo);
+ for (Long sdId : idWithSkewedInfo) {
+ SkewedInfo skewedInfo = sdIdToSkewedInfo.get(sdId);
+ idToSkewedColNames.put(sdId, skewedInfo.getSkewedColNames());
+ List<List<String>> skewedColValues = skewedInfo.getSkewedColValues();
+ if (skewedColValues != null) {
+ for (List<String> colValues : skewedColValues) {
+ Long nextStringListId = getDataStoreId(MStringList.class);
+ newStringListId.add(nextStringListId);
+ sdIdToNewStringListId.computeIfAbsent(sdId, k -> new
ArrayList<>()).add(nextStringListId);
+ stringListIdToValues.put(nextStringListId, colValues);
+ }
+ }
+ Map<List<String>, String> skewedColValueLocationMaps =
skewedInfo.getSkewedColValueLocationMaps();
+ if (skewedColValueLocationMaps != null) {
+ for (Map.Entry<List<String>, String> entry :
skewedColValueLocationMaps.entrySet()) {
+ List<String> colValues = entry.getKey();
+ String location = entry.getValue();
+ Long nextStringListId = getDataStoreId(MStringList.class);
+ newStringListId.add(nextStringListId);
+ stringListIdToValues.put(nextStringListId, colValues);
+ sdIdToValueLoc.computeIfAbsent(sdId, k -> new
ArrayList<>()).add(Pair.of(nextStringListId, location));
+ }
+ }
+ }
+
+ insertSkewedColNamesInBatch(idToSkewedColNames, sdIds);
+ insertStringListInBatch(newStringListId);
+ insertStringListValuesInBatch(stringListIdToValues, newStringListId);
+ insertSkewedValuesInBatch(sdIdToNewStringListId, sdIds);
+ insertSkewColValueLocInBatch(sdIdToValueLoc, sdIds);
+ }
+
+ private Long getDataStoreId(Class<?> modelClass) throws MetaException {
+ ExecutionContext ec = ((JDOPersistenceManager) pm).getExecutionContext();
+ AbstractClassMetaData cmd =
ec.getMetaDataManager().getMetaDataForClass(modelClass,
ec.getClassLoaderResolver());
+ if (cmd.getIdentityType() == IdentityType.DATASTORE) {
+ return (Long) ec.getStoreManager().getValueGenerationStrategyValue(ec,
cmd, -1);
+ } else {
+ throw new MetaException("Identity type is not datastore.");
+ }
+ }
+
+ private void insertSkewedColNamesInBatch(Map<Long, List<String>>
sdIdToSkewedColNames,
+ List<Long> sdIds) throws
MetaException {
+ List<String> columns = Arrays.asList("\"SD_ID\"", "\"INTEGER_IDX\"",
"\"SKEWED_COL_NAME\"");
+ String stmt = TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_NAMES\"",
columns);
+ List<Long> idWithSkewedCols = filterIdsByNonNullValue(sdIds,
sdIdToSkewedColNames);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithSkewedCols, new Batchable<Long, Object>() {
+ @Override
+ public List<Object> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ List<String> skewedColNames = sdIdToSkewedColNames.get(id);
+ for (int i = 0; i < skewedColNames.size(); i++) {
+ statement.setLong(1, id);
+ statement.setInt(2, i);
+ statement.setString(3, skewedColNames.get(i));
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), stmt);
+ }
+
+ private void insertStringListInBatch(List<Long> stringListIds) throws
MetaException {
+ List<String> columns = Arrays.asList("\"STRING_LIST_ID\"");
+ String insertQuery =
TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST\"", columns);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 1);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
stringListIds,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ statement.setLong(1, id);
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), insertQuery);
+ }
+
+ private void insertStringListValuesInBatch(Map<Long, List<String>>
stringListIdToValues,
+ List<Long> stringListIds) throws
MetaException {
+ List<String> columns = Arrays.asList("\"STRING_LIST_ID\"",
"\"INTEGER_IDX\"", "\"STRING_LIST_VALUE\"");
+ String insertQuery =
TxnUtils.createInsertPreparedStmt("\"SKEWED_STRING_LIST_VALUES\"", columns);
+ List<Long> idWithStringList = filterIdsByNonNullValue(stringListIds,
stringListIdToValues);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithStringList,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long stringListId : input) {
+ List<String> values = stringListIdToValues.get(stringListId);
+ for (int i = 0; i < values.size(); i++) {
+ statement.setLong(1, stringListId);
+ statement.setInt(2, i);
+ statement.setString(3, values.get(i));
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), insertQuery);
+ }
+
+ private void insertSkewedValuesInBatch(Map<Long, List<Long>>
sdIdToStringListId,
+ List<Long> sdIds) throws MetaException
{
+ List<String> columns = Arrays.asList("\"SD_ID_OID\"", "\"INTEGER_IDX\"",
"\"STRING_LIST_ID_EID\"");
+ String insertQuery =
TxnUtils.createInsertPreparedStmt("\"SKEWED_VALUES\"", columns);
+ List<Long> idWithSkewedValues = filterIdsByNonNullValue(sdIds,
sdIdToStringListId);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithSkewedValues,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ for (Long sdId : input) {
+ List<Long> stringListIds = sdIdToStringListId.get(sdId);
+ for (int i = 0; i < stringListIds.size(); i++) {
+ statement.setLong(1, sdId);
+ statement.setInt(2, i);
+ statement.setLong(3, stringListIds.get(i));
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), insertQuery);
+ }
+
+ private void insertSkewColValueLocInBatch(Map<Long, List<Pair<Long,
String>>> sdIdToColValueLoc,
+ List<Long> sdIds) throws
MetaException {
+ List<String> columns = Arrays.asList("\"SD_ID\"",
"\"STRING_LIST_ID_KID\"", "\"LOCATION\"");
+ String insertQuery =
TxnUtils.createInsertPreparedStmt("\"SKEWED_COL_VALUE_LOC_MAP\"", columns);
+ List<Long> idWithColValueLoc = filterIdsByNonNullValue(sdIds,
sdIdToColValueLoc);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows,
idWithColValueLoc,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ for (Long sdId : input) {
+ List<Pair<Long, String>> stringListIdAndLoc =
sdIdToColValueLoc.get(sdId);
+ for (Pair<Long, String> pair : stringListIdAndLoc) {
+ statement.setLong(1, sdId);
+ statement.setLong(2, pair.getLeft());
+ statement.setString(3, pair.getRight());
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }
+ ), insertQuery);
+ }
+
+ private Map<Long, Long> updateCDInBatch(List<Long> cdIds, List<Long> sdIds,
Map<Long, Long> sdIdToCdId,
+ Map<Long, List<FieldSchema>>
sdIdToNewColumns) throws MetaException {
+ Map<Long, List<Pair<Integer, FieldSchema>>> cdIdToColIdxPair = new
HashMap<>();
+ Batchable.runBatched(maxBatchSize, cdIds, new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ String queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\",
\"TYPE_NAME\", " +
+ "\"INTEGER_IDX\" from \"COLUMNS_V2\" where \"CD_ID\" in (" +
idLists + ")";
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(),
null, queryText);
+ for (Object[] row : sqlResult) {
+ Long id = extractSqlLong(row[0]);
+ String comment = (String) row[1];
+ String name = (String) row[2];
+ String type = (String) row[3];
+ int index = extractSqlInt(row[4]);
+ FieldSchema field = new FieldSchema(name, type, comment);
+ cdIdToColIdxPair.computeIfAbsent(id, k -> new
ArrayList<>()).add(Pair.of(index, field));
+ }
+ }
+ return null;
+ }
+ });
+ List<Long> newCdIds = new ArrayList<>();
+ Map<Long, List<FieldSchema>> newCdIdToCols = new HashMap<>();
+ Map<Long, Long> oldCdIdToNewCdId = new HashMap<>();
+ Map<Long, Long> sdIdToNewCdId = new HashMap<>();
+ // oldCdId -> [(oldIdx, newIdx)], used to update KEY_CONSTRAINTS
+ Map<Long, List<Pair<Integer, Integer>>> oldCdIdToColIdxPairs = new
HashMap<>();
+ for (Long sdId : sdIds) {
+ Long cdId = sdIdToCdId.get(sdId);
+ List<Pair<Integer, FieldSchema>> cols = cdIdToColIdxPair.get(cdId);
+ // Placeholder to avoid IndexOutOfBoundsException.
+ List<FieldSchema> oldCols = new
ArrayList<>(Collections.nCopies(cols.size(), null));
+ cols.forEach(pair -> oldCols.set(pair.getLeft(), pair.getRight()));
+
+ List<FieldSchema> newCols = sdIdToNewColumns.get(sdId);
+ // Use the new column descriptor only if the old column descriptor
differs from the new one.
+ if (oldCols == null || !oldCols.equals(newCols)) {
+ if (oldCols != null && newCols != null) {
+ Long newCdId = getDataStoreId(MColumnDescriptor.class);
+ newCdIds.add(newCdId);
+ newCdIdToCols.put(newCdId, newCols);
+ oldCdIdToNewCdId.put(cdId, newCdId);
+ sdIdToNewCdId.put(sdId, newCdId);
+ for (int i = 0; i < oldCols.size(); i++) {
+ FieldSchema oldCol = oldCols.get(i);
+ int newIdx = newCols.indexOf(oldCol);
+ if (newIdx != -1) {
+ oldCdIdToColIdxPairs.computeIfAbsent(cdId, k -> new
ArrayList<>()).add(Pair.of(i, newIdx));
+ }
+ }
+ }
+ }
+ }
+
+ insertCDInBatch(newCdIds, newCdIdToCols);
+ // TODO: followed the jdo implement now, but it should be an error in such
case:
+ // partitions use the default table cd, when changing partition cd
with
+ // constraint key mapping, the constraints will be update unexpected.
+ updateKeyConstraintsInBatch(oldCdIdToNewCdId, oldCdIdToColIdxPairs);
+
+ return sdIdToNewCdId;
+ }
+
+ private void insertCDInBatch(List<Long> ids, Map<Long, List<FieldSchema>>
idToCols)
+ throws MetaException {
+ String insertCds = TxnUtils.createInsertPreparedStmt("\"CDS\"",
Arrays.asList("\"CD_ID\""));
+ int maxRows = dbType.getMaxRows(maxBatchSize, 1);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows, ids,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ statement.setLong(1, id);
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), insertCds);
+
+ List<String> columns = Arrays.asList("\"CD_ID\"",
+ "\"COMMENT\"", "\"COLUMN_NAME\"", "\"TYPE_NAME\"", "\"INTEGER_IDX\"");
+ String insertColumns = TxnUtils.createInsertPreparedStmt("\"COLUMNS_V2\"",
columns);
+ int maxRowsForCDs = dbType.getMaxRows(maxBatchSize, 5);
+ updateWithStatement(statement -> Batchable.runBatched(maxRowsForCDs, ids,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ for (Long id : input) {
+ List<FieldSchema> cols = idToCols.get(id);
+ for (int i = 0; i < cols.size(); i++) {
+ FieldSchema col = cols.get(i);
+ statement.setLong(1, id);
+ statement.setString(2, col.getComment());
+ statement.setString(3, col.getName());
+ statement.setString(4, col.getType());
+ statement.setInt(5, i);
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), insertColumns);
+ }
+
+ private void updateKeyConstraintsInBatch(Map<Long, Long> oldCdIdToNewCdId,
+ Map<Long, List<Pair<Integer,
Integer>>> oldCdIdToColIdxPairs) throws MetaException {
+ List<Long> oldCdIds = new ArrayList<>(oldCdIdToNewCdId.keySet());
+ String tableName = "\"KEY_CONSTRAINTS\"";
+ List<String> parentColumns = Arrays.asList("\"PARENT_CD_ID\"",
"\"PARENT_INTEGER_IDX\"");
+ List<String> childColumns = Arrays.asList("\"CHILD_CD_ID\"",
"\"CHILD_INTEGER_IDX\"");
+
+ String updateParent = TxnUtils.createUpdatePreparedStmt(tableName,
parentColumns, parentColumns);
+ String updateChild = TxnUtils.createUpdatePreparedStmt(tableName,
childColumns, childColumns);
+ for (String updateStmt : new String[]{updateParent, updateChild}) {
+ int maxRows = dbType.getMaxRows(maxBatchSize, 4);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows, oldCdIds,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long oldId : input) {
+ // Followed the jdo implement to update only mapping columns
for KEY_CONSTRAINTS.
+ if (!oldCdIdToColIdxPairs.containsKey(oldId)) {
+ continue;
+ }
+ Long newId = oldCdIdToNewCdId.get(oldId);
+ for (Pair<Integer, Integer> idx :
oldCdIdToColIdxPairs.get(oldId)) {
+ statement.setLong(1, newId);
+ statement.setInt(2, idx.getRight());
+ statement.setLong(3, oldId);
+ statement.setInt(4, idx.getLeft());
+ statement.addBatch();
+ }
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), updateStmt);
+ }
+ }
+
+ private void deleteCDInBatch(List<Long> cdIds) throws MetaException {
+ Batchable.runBatched(maxBatchSize, cdIds, new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws Exception {
+ String idLists = MetaStoreDirectSql.getIdListForIn(input);
+ // First remove any constraints that may be associated with these CDs
+ String deleteConstraintsByCd = "delete from \"KEY_CONSTRAINTS\" where
\"CHILD_CD_ID\" in ("
+ + idLists + ") or \"PARENT_CD_ID\" in (" + idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate,
deleteConstraintsByCd);
+
+ // Then delete COLUMNS_V2 before CDS for foreign constraints.
+ String deleteColumns = "delete from \"COLUMNS_V2\" where \"CD_ID\" in
(" + idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate, deleteColumns);
+
+ // Finally delete CDS
+ String deleteCDs = "delete from \"CDS\" where \"CD_ID\" in (" +
idLists + ")";
+ updateWithStatement(PreparedStatement::executeUpdate, deleteCDs);
+ return null;
+ }
+ });
+ }
+
+ private void updateSerdeInBatch(List<Long> ids, Map<Long, SerDeInfo>
idToSerde)
+ throws MetaException {
+ // Followed the jdo implement to update only NAME and SLIB of SERDES.
+ List<String> columns = Arrays.asList("\"NAME\"", "\"SLIB\"");
+ List<String> condKeys = Arrays.asList("\"SERDE_ID\"");
+ String updateStmt = TxnUtils.createUpdatePreparedStmt("\"SERDES\"",
columns, condKeys);
+ List<Long> idWithSerde = filterIdsByNonNullValue(ids, idToSerde);
+ int maxRows = dbType.getMaxRows(maxBatchSize, 3);
+ updateWithStatement(statement -> Batchable.runBatched(maxRows, idWithSerde,
+ new Batchable<Long, Void>() {
+ @Override
+ public List<Void> run(List<Long> input) throws SQLException {
+ for (Long id : input) {
+ SerDeInfo serde = idToSerde.get(id);
+ statement.setString(1, serde.getName());
+ statement.setString(2, serde.getSerializationLib());
+ statement.setLong(3, id);
+ statement.addBatch();
+ }
+ statement.executeBatch();
+ return null;
+ }
+ }), updateStmt);
+ }
+
+ private static final class PartitionInfo {
+ long partitionId;
+ long writeId;
+ String partitionName;
+ public PartitionInfo(long partitionId, long writeId, String partitionName)
{
+ this.partitionId = partitionId;
+ this.writeId = writeId;
+ this.partitionName = partitionName;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)partitionId;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (!(o instanceof PartitionInfo)) {
+ return false;
+ }
+ PartitionInfo other = (PartitionInfo)o;
+ if (this.partitionId != other.partitionId) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private static final class PartColNameInfo {
+ long partitionId;
+ String colName;
+ public PartColNameInfo(long partitionId, String colName) {
+ this.partitionId = partitionId;
+ this.colName = colName;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)partitionId;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ if (!(o instanceof PartColNameInfo)) {
+ return false;
+ }
+ PartColNameInfo other = (PartColNameInfo)o;
+ if (this.partitionId != other.partitionId) {
+ return false;
+ }
+ if (this.colName.equalsIgnoreCase(other.colName)) {
+ return true;
+ }
+ return false;
+ }
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
deleted file mode 100644
index df1f77f2064..00000000000
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdateStat.java
+++ /dev/null
@@ -1,727 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
-import
org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEventBatch;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage;
-import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
-import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import javax.jdo.PersistenceManager;
-import javax.jdo.datastore.JDOConnection;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import static
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
-import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
-import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-
-/**
- * This class contains the optimizations for MetaStore that rely on direct SQL
access to
- * the underlying database. It should use ANSI SQL and be compatible with
common databases
- * such as MySQL (note that MySQL doesn't use full ANSI mode by default),
Postgres, etc.
- *
- * This class separates out the statistics update part from MetaStoreDirectSql
class.
- */
-class DirectSqlUpdateStat {
- private static final Logger LOG =
LoggerFactory.getLogger(DirectSqlUpdateStat.class.getName());
- PersistenceManager pm;
- Configuration conf;
- DatabaseProduct dbType;
- int maxBatchSize;
- SQLGenerator sqlGenerator;
- private static final ReentrantLock derbyLock = new ReentrantLock(true);
-
- public DirectSqlUpdateStat(PersistenceManager pm, Configuration conf,
- DatabaseProduct dbType, int
batchSize) {
- this.pm = pm;
- this.conf = conf;
- this.dbType = dbType;
- this.maxBatchSize = batchSize;
- sqlGenerator = new SQLGenerator(dbType, conf);
- }
-
- /**
- * {@link #lockInternal()} and {@link #unlockInternal()} are used to
serialize those operations that require
- * Select ... For Update to sequence operations properly. In practice that
means when running
- * with Derby database. See more notes at class level.
- */
- private void lockInternal() {
- if(dbType.isDERBY()) {
- derbyLock.lock();
- }
- }
-
- private void unlockInternal() {
- if(dbType.isDERBY()) {
- derbyLock.unlock();
- }
- }
-
- void rollbackDBConn(Connection dbConn) {
- try {
- if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
- } catch (SQLException e) {
- LOG.warn("Failed to rollback db connection ", e);
- }
- }
-
- void closeDbConn(JDOConnection jdoConn) {
- try {
- if (jdoConn != null) {
- jdoConn.close();
- }
- } catch (Exception e) {
- LOG.warn("Failed to close db connection", e);
- }
- }
-
- void closeStmt(Statement stmt) {
- try {
- if (stmt != null && !stmt.isClosed()) stmt.close();
- } catch (SQLException e) {
- LOG.warn("Failed to close statement ", e);
- }
- }
-
- void close(ResultSet rs) {
- try {
- if (rs != null && !rs.isClosed()) {
- rs.close();
- }
- }
- catch(SQLException ex) {
- LOG.warn("Failed to close statement ", ex);
- }
- }
-
- static String quoteString(String input) {
- return "'" + input + "'";
- }
-
- void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
- close(rs);
- closeStmt(stmt);
- closeDbConn(dbConn);
- }
-
- private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics>
statsPartInfoMap,
- Map<PartColNameInfo,
MPartitionColumnStatistics> updateMap,
- Map<PartColNameInfo,
MPartitionColumnStatistics>insertMap,
- Connection dbConn, Table tbl) throws
SQLException, MetaException, NoSuchObjectException {
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
- List<String> queries = new ArrayList<>();
- Set<PartColNameInfo> selectedParts = new HashSet<>();
-
- List<Long> partIdList = statsPartInfoMap.keySet().stream().map(
- e -> e.partitionId).collect(Collectors.toList()
- );
-
- prefix.append("select \"PART_ID\", \"COLUMN_NAME\" from \"PART_COL_STATS\"
WHERE ");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- partIdList, "\"PART_ID\"", true, false);
-
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query " + query);
- rs = statement.executeQuery(query);
- while (rs.next()) {
- selectedParts.add(new PartColNameInfo(rs.getLong(1),
rs.getString(2)));
- }
- } finally {
- close(rs, statement, null);
- }
- }
-
- for (Map.Entry entry : statsPartInfoMap.entrySet()) {
- PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
- ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
- long partId = partitionInfo.partitionId;
- ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
- if (!statsDesc.isSetCatName()) {
- statsDesc.setCatName(tbl.getCatName());
- }
- for (ColumnStatisticsObj statisticsObj : colStats.getStatsObj()) {
- PartColNameInfo temp = new PartColNameInfo(partId,
statisticsObj.getColName());
- if (selectedParts.contains(temp)) {
- updateMap.put(temp, StatObjectConverter.
- convertToMPartitionColumnStatistics(null, statsDesc,
statisticsObj, colStats.getEngine()));
- } else {
- insertMap.put(temp, StatObjectConverter.
- convertToMPartitionColumnStatistics(null, statsDesc,
statisticsObj, colStats.getEngine()));
- }
- }
- }
- }
-
- private void updatePartColStatTable(Map<PartColNameInfo,
MPartitionColumnStatistics> updateMap,
- Connection dbConn) throws
SQLException, MetaException, NoSuchObjectException {
- PreparedStatement pst = null;
- for (Map.Entry entry : updateMap.entrySet()) {
- PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
- Long partId = partColNameInfo.partitionId;
- MPartitionColumnStatistics mPartitionColumnStatistics =
(MPartitionColumnStatistics) entry.getValue();
- String update = "UPDATE \"PART_COL_STATS\" SET ";
- update +=
StatObjectConverter.getUpdatedColumnSql(mPartitionColumnStatistics);
- update += " WHERE \"PART_ID\" = " + partId + " AND "
- + " \"COLUMN_NAME\" = " +
quoteString(mPartitionColumnStatistics.getColName());
- try {
- pst = dbConn.prepareStatement(update);
-
StatObjectConverter.initUpdatedColumnStatement(mPartitionColumnStatistics, pst);
- LOG.debug("Going to execute update " + update);
- int numUpdate = pst.executeUpdate();
- if (numUpdate != 1) {
- throw new MetaException("Invalid state of PART_COL_STATS for
PART_ID " + partId);
- }
- } finally {
- closeStmt(pst);
- }
- }
- }
-
- private void insertIntoPartColStatTable(Map<PartColNameInfo,
MPartitionColumnStatistics> insertMap,
- long maxCsId,
- Connection dbConn) throws
SQLException, MetaException, NoSuchObjectException {
- PreparedStatement preparedStatement = null;
- int numRows = 0;
- String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\",
\"DB_NAME\","
- + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\",
\"COLUMN_TYPE\", \"PART_ID\","
- + " \"LONG_LOW_VALUE\", \"LONG_HIGH_VALUE\",
\"DOUBLE_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\","
- + " \"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\",
\"NUM_NULLS\", \"NUM_DISTINCTS\", \"BIT_VECTOR\" ,"
- + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\",
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
- + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?)";
-
- try {
- preparedStatement = dbConn.prepareStatement(insert);
- for (Map.Entry entry : insertMap.entrySet()) {
- PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
- Long partId = partColNameInfo.partitionId;
- MPartitionColumnStatistics mPartitionColumnStatistics =
(MPartitionColumnStatistics) entry.getValue();
-
- preparedStatement.setLong(1, maxCsId);
- preparedStatement.setString(2,
mPartitionColumnStatistics.getCatName());
- preparedStatement.setString(3, mPartitionColumnStatistics.getDbName());
- preparedStatement.setString(4,
mPartitionColumnStatistics.getTableName());
- preparedStatement.setString(5,
mPartitionColumnStatistics.getPartitionName());
- preparedStatement.setString(6,
mPartitionColumnStatistics.getColName());
- preparedStatement.setString(7,
mPartitionColumnStatistics.getColType());
- preparedStatement.setLong(8, partId);
- preparedStatement.setObject(9,
mPartitionColumnStatistics.getLongLowValue());
- preparedStatement.setObject(10,
mPartitionColumnStatistics.getLongHighValue());
- preparedStatement.setObject(11,
mPartitionColumnStatistics.getDoubleHighValue());
- preparedStatement.setObject(12,
mPartitionColumnStatistics.getDoubleLowValue());
- preparedStatement.setString(13,
mPartitionColumnStatistics.getDecimalLowValue());
- preparedStatement.setString(14,
mPartitionColumnStatistics.getDecimalHighValue());
- preparedStatement.setObject(15,
mPartitionColumnStatistics.getNumNulls());
- preparedStatement.setObject(16,
mPartitionColumnStatistics.getNumDVs());
- preparedStatement.setObject(17,
mPartitionColumnStatistics.getBitVector());
- preparedStatement.setBytes(18,
mPartitionColumnStatistics.getHistogram());
- preparedStatement.setObject(19,
mPartitionColumnStatistics.getAvgColLen());
- preparedStatement.setObject(20,
mPartitionColumnStatistics.getMaxColLen());
- preparedStatement.setObject(21,
mPartitionColumnStatistics.getNumTrues());
- preparedStatement.setObject(22,
mPartitionColumnStatistics.getNumFalses());
- preparedStatement.setLong(23,
mPartitionColumnStatistics.getLastAnalyzed());
- preparedStatement.setString(24,
mPartitionColumnStatistics.getEngine());
-
- maxCsId++;
- numRows++;
- preparedStatement.addBatch();
- if (numRows == maxBatchSize) {
- preparedStatement.executeBatch();
- numRows = 0;
- }
- }
-
- if (numRows != 0) {
- preparedStatement.executeBatch();
- }
- } finally {
- closeStmt(preparedStatement);
- }
- }
-
- private Map<Long, String> getParamValues(Connection dbConn, List<Long>
partIdList) throws SQLException {
- List<String> queries = new ArrayList<>();
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
-
- prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
- + " from \"PARTITION_PARAMS\" where "
- + " \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE' "
- + " and ");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- partIdList, "\"PART_ID\"", true, false);
-
- Map<Long, String> partIdToParaMap = new HashMap<>();
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query " + query);
- rs = statement.executeQuery(query);
- while (rs.next()) {
- partIdToParaMap.put(rs.getLong(1), rs.getString(2));
- }
- } finally {
- close(rs, statement, null);
- }
- }
- return partIdToParaMap;
- }
-
- private void updateWriteIdForPartitions(Connection dbConn, long writeId,
List<Long> partIdList) throws SQLException {
- StringBuilder prefix = new StringBuilder();
- List<String> queries = new ArrayList<>();
- StringBuilder suffix = new StringBuilder();
-
- prefix.append("UPDATE \"PARTITIONS\" set \"WRITE_ID\" = " + writeId + "
where ");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- partIdList, "\"PART_ID\"", false, false);
-
- Statement statement = null;
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute update " + query);
- statement.executeUpdate(query);
- } finally {
- closeStmt(statement);
- }
- }
- }
-
- private Map<String, Map<String, String>>
updatePartitionParamTable(Connection dbConn,
-
Map<PartitionInfo, ColumnStatistics> partitionInfoMap,
- String
validWriteIds,
- long
writeId,
- boolean
isAcidTable)
- throws SQLException, MetaException {
- Map<String, Map<String, String>> result = new HashMap<>();
- boolean areTxnStatsSupported = MetastoreConf.getBoolVar(conf,
ConfVars.HIVE_TXN_STATS_ENABLED);
- PreparedStatement statementInsert = null;
- PreparedStatement statementDelete = null;
- PreparedStatement statementUpdate = null;
- String insert = "INSERT INTO \"PARTITION_PARAMS\" (\"PART_ID\",
\"PARAM_KEY\", \"PARAM_VALUE\") "
- + "VALUES( ? , 'COLUMN_STATS_ACCURATE' , ? )";
- String delete = "DELETE from \"PARTITION_PARAMS\" "
- + " where \"PART_ID\" = ? "
- + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
- String update = "UPDATE \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? "
- + " where \"PART_ID\" = ? "
- + " and \"PARAM_KEY\" = 'COLUMN_STATS_ACCURATE'";
- int numInsert = 0;
- int numDelete = 0;
- int numUpdate = 0;
-
- List<Long> partIdList = partitionInfoMap.keySet().stream().map(
- e -> e.partitionId).collect(Collectors.toList()
- );
-
- // get the old parameters from PARTITION_PARAMS table.
- Map<Long, String> partIdToParaMap = getParamValues(dbConn, partIdList);
-
- try {
- statementInsert = dbConn.prepareStatement(insert);
- statementDelete = dbConn.prepareStatement(delete);
- statementUpdate = dbConn.prepareStatement(update);
- for (Map.Entry entry : partitionInfoMap.entrySet()) {
- PartitionInfo partitionInfo = (PartitionInfo) entry.getKey();
- ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
- List<String> colNames = colStats.getStatsObj().stream().map(e ->
e.getColName()).collect(Collectors.toList());
- long partWriteId = partitionInfo.writeId;
- long partId = partitionInfo.partitionId;
- Map<String, String> newParameter;
-
- if (!partIdToParaMap.containsKey(partId)) {
- newParameter = new HashMap<>();
- newParameter.put(COLUMN_STATS_ACCURATE, "TRUE");
- StatsSetupConst.setColumnStatsState(newParameter, colNames);
- statementInsert.setLong(1, partId);
- statementInsert.setString(2,
newParameter.get(COLUMN_STATS_ACCURATE));
- numInsert++;
- statementInsert.addBatch();
- if (numInsert == maxBatchSize) {
- LOG.debug(" Executing insert " + insert);
- statementInsert.executeBatch();
- numInsert = 0;
- }
- } else {
- String oldStats = partIdToParaMap.get(partId);
-
- Map<String, String> oldParameter = new HashMap<>();
- oldParameter.put(COLUMN_STATS_ACCURATE, oldStats);
-
- newParameter = new HashMap<>();
- newParameter.put(COLUMN_STATS_ACCURATE, oldStats);
- StatsSetupConst.setColumnStatsState(newParameter, colNames);
-
- if (isAcidTable) {
- String errorMsg = ObjectStore.verifyStatsChangeCtx(
- colStats.getStatsDesc().getDbName() + "." +
colStats.getStatsDesc().getTableName(),
- oldParameter, newParameter, writeId, validWriteIds, true);
- if (errorMsg != null) {
- throw new MetaException(errorMsg);
- }
- }
-
- if (isAcidTable &&
- (!areTxnStatsSupported ||
!ObjectStore.isCurrentStatsValidForTheQuery(oldParameter, partWriteId,
- validWriteIds, true))) {
- statementDelete.setLong(1, partId);
- statementDelete.addBatch();
- numDelete++;
- if (numDelete == maxBatchSize) {
- statementDelete.executeBatch();
- numDelete = 0;
- LOG.debug("Removed COLUMN_STATS_ACCURATE from the parameters of
the partition "
- + colStats.getStatsDesc().getDbName() + "." +
colStats.getStatsDesc().getTableName() + "."
- + colStats.getStatsDesc().getPartName());
- }
- } else {
- statementUpdate.setString(1,
newParameter.get(COLUMN_STATS_ACCURATE));
- statementUpdate.setLong(2, partId);
- statementUpdate.addBatch();
- numUpdate++;
- if (numUpdate == maxBatchSize) {
- LOG.debug(" Executing update " + statementUpdate);
- statementUpdate.executeBatch();
- numUpdate = 0;
- }
- }
- }
- result.put(partitionInfo.partitionName, newParameter);
- }
-
- if (numInsert != 0) {
- statementInsert.executeBatch();
- }
-
- if (numUpdate != 0) {
- statementUpdate.executeBatch();
- }
-
- if (numDelete != 0) {
- statementDelete.executeBatch();
- }
-
- if (isAcidTable) {
- updateWriteIdForPartitions(dbConn, writeId, partIdList);
- }
- return result;
- } finally {
- closeStmt(statementInsert);
- closeStmt(statementUpdate);
- closeStmt(statementDelete);
- }
- }
-
- private static class PartitionInfo {
- long partitionId;
- long writeId;
- String partitionName;
- public PartitionInfo(long partitionId, long writeId, String partitionName)
{
- this.partitionId = partitionId;
- this.writeId = writeId;
- this.partitionName = partitionName;
- }
-
- @Override
- public int hashCode() {
- return (int)partitionId;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (!(o instanceof PartitionInfo)) {
- return false;
- }
- PartitionInfo other = (PartitionInfo)o;
- if (this.partitionId != other.partitionId) {
- return false;
- }
- return true;
- }
- }
-
- private static class PartColNameInfo {
- long partitionId;
- String colName;
- public PartColNameInfo(long partitionId, String colName) {
- this.partitionId = partitionId;
- this.colName = colName;
- }
-
- @Override
- public int hashCode() {
- return (int)partitionId;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null) {
- return false;
- }
- if (!(o instanceof PartColNameInfo)) {
- return false;
- }
- PartColNameInfo other = (PartColNameInfo)o;
- if (this.partitionId != other.partitionId) {
- return false;
- }
- if (this.colName.equalsIgnoreCase(other.colName)) {
- return true;
- }
- return false;
- }
- }
-
- private Map<PartitionInfo, ColumnStatistics> getPartitionInfo(Connection
dbConn, long tblId,
- Map<String,
ColumnStatistics> partColStatsMap)
- throws SQLException, MetaException {
- List<String> queries = new ArrayList<>();
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
- Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
-
- List<String> partKeys = partColStatsMap.keySet().stream().map(
- e -> quoteString(e)).collect(Collectors.toList()
- );
-
- prefix.append("select \"PART_ID\", \"WRITE_ID\", \"PART_NAME\" from
\"PARTITIONS\" where ");
- suffix.append(" and \"TBL_ID\" = " + tblId);
- TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
- partKeys, "\"PART_NAME\"", true, false);
-
- for (String query : queries) {
- // Select for update makes sure that the partitions are not modified
while the stats are getting updated.
- query = sqlGenerator.addForUpdateClause(query);
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query <" + query + ">");
- rs = statement.executeQuery(query);
- while (rs.next()) {
- PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
- rs.getLong(2), rs.getString(3));
- partitionInfoMap.put(partitionInfo,
partColStatsMap.get(rs.getString(3)));
- }
- } finally {
- close(rs, statement, null);
- }
- }
- return partitionInfoMap;
- }
-
- private void setAnsiQuotes(Connection dbConn) throws SQLException {
- if (sqlGenerator.getDbProduct().isMYSQL()) {
- try (Statement stmt = dbConn.createStatement()) {
- stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
- }
- }
- }
-
- /**
- * Update the statistics for the given partitions. Add the notification logs
also.
- * @return map of partition key to column stats if successful, null
otherwise.
- */
- public Map<String, Map<String, String>>
updatePartitionColumnStatistics(Map<String, ColumnStatistics> partColStatsMap,
- Table tbl, long csId,
- String validWriteIds,
long writeId,
-
List<TransactionalMetaStoreEventListener> transactionalListeners)
- throws MetaException {
- JDOConnection jdoConn = null;
- Connection dbConn = null;
- boolean committed = false;
- try {
- lockInternal();
- jdoConn = pm.getDataStoreConnection();
- dbConn = (Connection) (jdoConn.getNativeConnection());
-
- setAnsiQuotes(dbConn);
-
- Map<PartitionInfo, ColumnStatistics> partitionInfoMap =
getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);
-
- Map<String, Map<String, String>> result =
- updatePartitionParamTable(dbConn, partitionInfoMap,
validWriteIds, writeId, TxnUtils.isAcidTable(tbl));
-
- Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new
HashMap<>();
- Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new
HashMap<>();
- populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn,
tbl);
-
- LOG.info("Number of stats to insert " + insertMap.size() + " update " +
updateMap.size());
-
- if (insertMap.size() != 0) {
- insertIntoPartColStatTable(insertMap, csId, dbConn);
- }
-
- if (updateMap.size() != 0) {
- updatePartColStatTable(updateMap, dbConn);
- }
-
- if (transactionalListeners != null) {
- UpdatePartitionColumnStatEventBatch eventBatch = new
UpdatePartitionColumnStatEventBatch(null);
- for (Map.Entry entry : result.entrySet()) {
- Map<String, String> parameters = (Map<String, String>)
entry.getValue();
- ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
- List<String> partVals = getPartValsFromName(tbl,
colStats.getStatsDesc().getPartName());
- UpdatePartitionColumnStatEvent event = new
UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
- tbl, writeId, null);
- eventBatch.addPartColStatEvent(event);
- }
-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH,
eventBatch, dbConn, sqlGenerator);
- }
- dbConn.commit();
- committed = true;
- return result;
- } catch (Exception e) {
- LOG.error("Unable to update Column stats for " + tbl.getTableName(), e);
- throw new MetaException("Unable to update Column stats for " +
tbl.getTableName()
- + " due to: " + e.getMessage());
- } finally {
- if (!committed) {
- rollbackDBConn(dbConn);
- }
- closeDbConn(jdoConn);
- unlockInternal();
- }
- }
-
- /**
- * Gets the next CS id from sequence MPartitionColumnStatistics and
increment the CS id by numStats.
- * @return The CD id before update.
- */
- public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws
MetaException {
- Statement statement = null;
- ResultSet rs = null;
- long maxCsId = 0;
- boolean committed = false;
- Connection dbConn = null;
- JDOConnection jdoConn = null;
-
- try {
- lockInternal();
- jdoConn = pm.getDataStoreConnection();
- dbConn = (Connection) (jdoConn.getNativeConnection());
-
- setAnsiQuotes(dbConn);
-
- // This loop will be iterated at max twice. If there is no records, it
will first insert and then do a select.
- // We are not using any upsert operations as select for update and then
update is required to make sure that
- // the caller gets a reserved range for CSId not used by any other
thread.
- boolean insertDone = false;
- while (maxCsId == 0) {
- String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\"
FROM \"SEQUENCE_TABLE\" "
- + "WHERE \"SEQUENCE_NAME\"= "
- +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
- LOG.debug("Going to execute query " + query);
- statement = dbConn.createStatement();
- rs = statement.executeQuery(query);
- if (rs.next()) {
- maxCsId = rs.getLong(1);
- } else if (insertDone) {
- throw new MetaException("Invalid state of SEQUENCE_TABLE for
MPartitionColumnStatistics");
- } else {
- insertDone = true;
- closeStmt(statement);
- statement = dbConn.createStatement();
- query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\",
\"NEXT_VAL\") VALUES ( "
- +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
+ "," + 1
- + ")";
- try {
- statement.executeUpdate(query);
- } catch (SQLException e) {
- // If the record is already inserted by some other thread continue
to select.
- if (dbType.isDuplicateKeyError(e)) {
- continue;
- }
- LOG.error("Unable to insert into SEQUENCE_TABLE for
MPartitionColumnStatistics.", e);
- throw e;
- } finally {
- closeStmt(statement);
- }
- }
- }
-
- long nextMaxCsId = maxCsId + numStats + 1;
- closeStmt(statement);
- statement = dbConn.createStatement();
- String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
- + nextMaxCsId
- + " WHERE \"SEQUENCE_NAME\" = "
- +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
- statement.executeUpdate(query);
- dbConn.commit();
- committed = true;
- return maxCsId;
- } catch (Exception e) {
- LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
- throw new MetaException("Unable to
getNextCSIdForMPartitionColumnStatistics "
- + " due to: " + e.getMessage());
- } finally {
- if (!committed) {
- rollbackDBConn(dbConn);
- }
- close(rs, statement, jdoConn);
- unlockInternal();
- }
- }
-}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 3ed850efbab..7100bf93ae1 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -5791,9 +5791,8 @@ public class HMSHandler extends FacebookBase implements
IHMSHandler {
final
EnvironmentContext envContext)
throws TException {
String[] parsedDbName = parseDbName(dbName, conf);
- // TODO: this method name is confusing, it actually does full alter
(sortof)
- rename_partition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
null, newPartition,
- envContext, null);
+ alter_partition_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tableName, null,
+ newPartition, envContext, null);
}
@Deprecated
@@ -5801,9 +5800,9 @@ public class HMSHandler extends FacebookBase implements
IHMSHandler {
public void rename_partition(final String db_name, final String tbl_name,
final List<String> part_vals, final Partition
new_part)
throws TException {
- // Call rename_partition without an environment context.
+ // Call alter_partition_core without an environment context.
String[] parsedDbName = parseDbName(db_name, conf);
- rename_partition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
part_vals, new_part,
+ alter_partition_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
tbl_name, part_vals, new_part,
null, null);
}
@@ -5813,12 +5812,12 @@ public class HMSHandler extends FacebookBase implements
IHMSHandler {
context.putToProperties(RENAME_PARTITION_MAKE_COPY,
String.valueOf(req.isClonePart()));
context.putToProperties(hive_metastoreConstants.TXN_ID,
String.valueOf(req.getTxnId()));
- rename_partition(req.getCatName(), req.getDbName(), req.getTableName(),
req.getPartVals(),
+ alter_partition_core(req.getCatName(), req.getDbName(),
req.getTableName(), req.getPartVals(),
req.getNewPart(), context, req.getValidWriteIdList());
return new RenamePartitionResponse();
};
- private void rename_partition(String catName, String db_name, String
tbl_name,
+ private void alter_partition_core(String catName, String db_name, String
tbl_name,
List<String> part_vals, Partition new_part,
EnvironmentContext envContext,
String validWriteIds) throws TException {
startTableFunction("alter_partition", catName, db_name, tbl_name);
@@ -5847,8 +5846,7 @@ public class HMSHandler extends FacebookBase implements
IHMSHandler {
Partition oldPart = null;
Exception ex = null;
try {
- Table table = null;
- table = getMS().getTable(catName, db_name, tbl_name, null);
+ Table table = getMS().getTable(catName, db_name, tbl_name, null);
firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, table,
part_vals, new_part, this));
if (part_vals != null && !part_vals.isEmpty()) {
@@ -5859,8 +5857,6 @@ public class HMSHandler extends FacebookBase implements
IHMSHandler {
oldPart = alterHandler.alterPartition(getMS(), wh, catName, db_name,
tbl_name,
part_vals, new_part, envContext, this, validWriteIds);
- // Only fetch the table if we actually have a listener
-
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.ALTER_PARTITION,
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 97956660791..d5e0db5366c 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -95,13 +95,13 @@ import
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.model.MConstraint;
import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
import org.apache.hadoop.hive.metastore.model.MDatabase;
-import org.apache.hadoop.hive.metastore.model.MFunction;
import org.apache.hadoop.hive.metastore.model.MNotificationLog;
import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
import org.apache.hadoop.hive.metastore.model.MPartition;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
+import org.apache.hadoop.hive.metastore.model.MTable;
import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
@@ -111,6 +111,7 @@ import
org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -152,6 +153,7 @@ class MetaStoreDirectSql {
private final int batchSize;
private final boolean convertMapNullsToEmptyStrings;
private final String defaultPartName;
+ private final boolean isTxnStatsEnabled;
/**
* Whether direct SQL can be used with the current datastore backing {@link
#pm}.
@@ -160,7 +162,7 @@ class MetaStoreDirectSql {
private final boolean isAggregateStatsCacheEnabled;
private final ImmutableMap<String, String> fieldnameToTableName;
private AggregateStatsCache aggrStatsCache;
- private DirectSqlUpdateStat updateStat;
+ private DirectSqlUpdatePart directSqlUpdatePart;
private DirectSqlInsertPart directSqlInsertPart;
/**
@@ -203,7 +205,8 @@ class MetaStoreDirectSql {
batchSize = dbType.needsInBatching() ? 1000 : NO_BATCHING;
}
this.batchSize = batchSize;
- this.updateStat = new DirectSqlUpdateStat(pm, conf, dbType, batchSize);
+ this.isTxnStatsEnabled = MetastoreConf.getBoolVar(conf,
ConfVars.HIVE_TXN_STATS_ENABLED);
+ this.directSqlUpdatePart = new DirectSqlUpdatePart(pm, conf, dbType,
batchSize);
ImmutableMap.Builder<String, String> fieldNameToTableNameBuilder =
new ImmutableMap.Builder<>();
@@ -535,6 +538,69 @@ class MetaStoreDirectSql {
directSqlInsertPart.addPartitions(parts, partPrivilegesList,
partColPrivilegesList);
}
+ /**
+ * Alter partitions in batch using direct SQL
+ * @param table the target table
+ * @param partNames list of partition names
+ * @param newParts list of new partitions
+ * @param queryWriteIdList valid write id list
+ * @return
+ * @throws MetaException
+ */
+ public List<Partition> alterPartitions(MTable table, List<String> partNames,
+ List<Partition> newParts, String
queryWriteIdList) throws MetaException {
+ List<Object[]> rows = Batchable.runBatched(batchSize, partNames, new
Batchable<String, Object[]>() {
+ @Override
+ public List<Object[]> run(List<String> input) throws Exception {
+ String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" +
makeParams(input.size()) + ")";
+ List<String> columns = Arrays.asList("\"PART_ID\"", "\"PART_NAME\"",
"\"SD_ID\"", "\"WRITE_ID\"");
+ return
getPartitionFieldsViaSqlFilter(table.getDatabase().getCatalogName(),
table.getDatabase().getName(),
+ table.getTableName(), columns, filter, input,
Collections.emptyList(), null);
+ }
+ });
+ Map<List<String>, Long> partValuesToId = new HashMap<>();
+ Map<Long, Long> partIdToSdId = new HashMap<>();
+ Map<Long, Long> partIdToWriteId = new HashMap<>();
+ for (Object[] row : rows) {
+ Long partId = MetastoreDirectSqlUtils.extractSqlLong(row[0]);
+ Long sdId = MetastoreDirectSqlUtils.extractSqlLong(row[2]);
+ Long writeId = MetastoreDirectSqlUtils.extractSqlLong(row[3]);
+ partIdToSdId.put(partId, sdId);
+ partIdToWriteId.put(partId, writeId);
+ List<String> partValues = Warehouse.getPartValuesFromPartName((String)
row[1]);
+ partValuesToId.put(partValues, partId);
+ }
+
+ boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
+ for (Partition newPart : newParts) {
+ Long partId = partValuesToId.get(newPart.getValues());
+ boolean useOldWriteId = true;
+ // If transactional, add/update the MUPdaterTransaction
+ // for the current updater query.
+ if (isTxn) {
+ if (!isTxnStatsEnabled) {
+ StatsSetupConst.setBasicStatsState(newPart.getParameters(),
StatsSetupConst.FALSE);
+ } else if (queryWriteIdList != null && newPart.getWriteId() > 0) {
+ // Check concurrent INSERT case and set false to the flag.
+ if
(!ObjectStore.isCurrentStatsValidForTheQuery(newPart.getParameters(),
+ partIdToWriteId.get(partId), queryWriteIdList, true)) {
+ StatsSetupConst.setBasicStatsState(newPart.getParameters(),
StatsSetupConst.FALSE);
+ LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the
partition " +
+ Warehouse.getQualifiedName(newPart) + " will be made
persistent.");
+ }
+ useOldWriteId = false;
+ }
+ }
+
+ if (useOldWriteId) {
+ newPart.setWriteId(partIdToWriteId.get(partId));
+ }
+ }
+
+ directSqlUpdatePart.alterPartitions(partValuesToId, partIdToSdId,
newParts);
+ return newParts;
+ }
+
/**
* Get partition names by using direct SQL queries.
* @param filter filter to use with direct sql
@@ -901,6 +967,28 @@ class MetaStoreDirectSql {
String catName, String dbName, String tblName, String sqlFilter,
List<? extends Object> paramsForFilter, List<String> joinsForFilter,
Integer max)
throws MetaException {
+ return getPartitionFieldsViaSqlFilter(catName, dbName, tblName,
+ Arrays.asList("\"PART_ID\""), sqlFilter, paramsForFilter,
joinsForFilter, max);
+ }
+
+ /**
+ * Get partition fields for the query using direct SQL queries, to avoid
bazillion
+ * queries created by DN retrieving stuff for each object individually.
+ * @param catName MetaStore catalog name
+ * @param dbName MetaStore db name
+ * @param tblName MetaStore table name
+ * @param partColumns part fields want to get
+ * @param sqlFilter SQL filter to use. Better be SQL92-compliant.
+ * @param paramsForFilter params for ?-s in SQL filter text. Params must be
in order.
+ * @param joinsForFilter if the filter needs additional join statement, they
must be in
+ * this list. Better be SQL92-compliant.
+ * @param max The maximum number of partitions to return.
+ * @return List of partition objects.
+ */
+ public <T> List<T> getPartitionFieldsViaSqlFilter(
+ String catName, String dbName, String tblName, List<String> partColumns,
String sqlFilter,
+ List<? extends Object> paramsForFilter, List<String> joinsForFilter,
Integer max)
+ throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
final String dbNameLcase = dbName.toLowerCase();
final String tblNameLcase = tblName.toLowerCase();
@@ -908,16 +996,17 @@ class MetaStoreDirectSql {
// We have to be mindful of order during filtering if we are not returning
all partitions.
String orderForFilter = (max != null) ? " order by " +
MetastoreConf.getVar(conf, ConfVars.PARTITION_ORDER_EXPR) : "";
+ String columns = partColumns.stream().map(col -> PARTITIONS + "." +
col).collect(Collectors.joining(","));
String queryText =
- "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + ""
- + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS
+ ".\"TBL_ID\" "
- + " and " + TBLS + ".\"TBL_NAME\" = ? "
- + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\" "
- + " and " + DBS + ".\"NAME\" = ? "
- + join(joinsForFilter, ' ')
- + " where " + DBS + ".\"CTLG_NAME\" = ? "
- + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) +
orderForFilter;
+ "select " + columns + " from " + PARTITIONS + ""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = "
+ TBLS + ".\"TBL_ID\" "
+ + " and " + TBLS + ".\"TBL_NAME\" = ? "
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\" "
+ + " and " + DBS + ".\"NAME\" = ? "
+ + join(joinsForFilter, ' ')
+ + " where " + DBS + ".\"CTLG_NAME\" = ? "
+ + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) +
orderForFilter;
Object[] params = new Object[paramsForFilter.size() + 3];
params[0] = tblNameLcase;
params[1] = dbNameLcase;
@@ -928,19 +1017,11 @@ class MetaStoreDirectSql {
long start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
- List<Object> sqlResult = executeWithArray(query.getInnerQuery(), params,
queryText,
+ List<T> sqlResult = executeWithArray(query.getInnerQuery(), params,
queryText,
((max == null) ? -1 : max.intValue()));
long queryTime = doTrace ? System.nanoTime() : 0;
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start,
queryTime);
- final List<Long> result;
- if (sqlResult.isEmpty()) {
- result = Collections.emptyList(); // no partitions, bail early.
- } else {
- result = new ArrayList<>(sqlResult.size());
- for (Object fields : sqlResult) {
- result.add(MetastoreDirectSqlUtils.extractSqlLong(fields));
- }
- }
+ List<T> result = new ArrayList<>(sqlResult);
return result;
}
}
@@ -3056,8 +3137,8 @@ class MetaStoreDirectSql {
ColumnStatistics colStats = (ColumnStatistics) entry.getValue();
numStats += colStats.getStatsObjSize();
}
- long csId = updateStat.getNextCSIdForMPartitionColumnStatistics(numStats);
- return updateStat.updatePartitionColumnStatistics(partColStatsMap, tbl,
csId, validWriteIds, writeId, listeners);
+ long csId =
directSqlUpdatePart.getNextCSIdForMPartitionColumnStatistics(numStats);
+ return
directSqlUpdatePart.updatePartitionColumnStatistics(partColStatsMap, tbl, csId,
validWriteIds, writeId, listeners);
}
public List<Function> getFunctions(String catName) throws MetaException {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0ae023bdd7e..de88e482b71 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4399,7 +4399,7 @@ public class ObjectStore implements RawStore,
Configurable {
protected abstract String describeResult();
protected abstract T getSqlResult(GetHelper<T> ctx) throws MetaException;
protected abstract T getJdoResult(
- GetHelper<T> ctx) throws MetaException, NoSuchObjectException;
+ GetHelper<T> ctx) throws MetaException, NoSuchObjectException,
InvalidObjectException;
public T run(boolean initTable) throws MetaException,
NoSuchObjectException {
try {
@@ -5261,91 +5261,114 @@ public class ObjectStore implements RawStore,
Configurable {
List<List<String>> part_vals, List<Partition>
newParts,
long writeId, String queryWriteIdList)
throws InvalidObjectException, MetaException
{
- boolean success = false;
- Exception e = null;
List<Partition> results = new ArrayList<>(newParts.size());
if (newParts.isEmpty()) {
return results;
}
+ catName = normalizeIdentifier(catName);
+ dbName = normalizeIdentifier(dbName);
+ tblName = normalizeIdentifier(tblName);
+
+ boolean success = false;
try {
openTransaction();
- MTable table = this.getMTable(catName, dbName, tblName);
- if (table == null) {
- throw new NoSuchObjectException(
- TableName.getQualified(catName, dbName, tblName) + " table not
found");
+ MTable table = ensureGetMTable(catName, dbName, tblName);
+ // Validate new parts: StorageDescriptor and SerDeInfo must be set in
Partition.
+ if (!TableType.VIRTUAL_VIEW.name().equals(table.getTableType())) {
+ for (Partition newPart : newParts) {
+ if (!newPart.isSetSd() || !newPart.getSd().isSetSerdeInfo()) {
+ throw new InvalidObjectException("Partition does not set
storageDescriptor or serdeInfo.");
+ }
+ }
}
+ if (writeId > 0) {
+ newParts.forEach(newPart -> newPart.setWriteId(writeId));
+ }
+
+ List<FieldSchema> partCols =
convertToFieldSchemas(table.getPartitionKeys());
List<String> partNames = new ArrayList<>();
for (List<String> partVal : part_vals) {
- partNames.add(
-
Warehouse.makePartName(convertToFieldSchemas(table.getPartitionKeys()), partVal)
- );
+ partNames.add(Warehouse.makePartName(partCols, partVal));
}
- catName = normalizeIdentifier(catName);
- dbName = normalizeIdentifier(dbName);
- tblName = normalizeIdentifier(tblName);
- List<MPartition> mPartitionList;
-
- try (Query query = pm.newQuery(MPartition.class,
- "table.tableName == t1 && table.database.name == t2 &&
t3.contains(partitionName) " +
- " && table.database.catalogName == t4")) {
- query.declareParameters("java.lang.String t1, java.lang.String t2,
java.util.Collection t3, "
- + "java.lang.String t4");
- mPartitionList = (List<MPartition>) query.executeWithArray(tblName,
dbName, partNames, catName);
- pm.retrieveAll(mPartitionList);
-
- if (mPartitionList.size() > newParts.size()) {
- throw new MetaException("Expecting only one partition but more than
one partitions are found.");
+ results = new GetListHelper<Partition>(catName, dbName, tblName, true,
true) {
+ @Override
+ protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx)
+ throws MetaException {
+ return directSql.alterPartitions(table, partNames, newParts,
queryWriteIdList);
}
- Map<List<String>, MPartition> mPartsMap = new HashMap();
- for (MPartition mPartition : mPartitionList) {
- mPartsMap.put(mPartition.getValues(), mPartition);
+ @Override
+ protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx)
+ throws MetaException, InvalidObjectException {
+ return alterPartitionsViaJdo(table, partNames, newParts,
queryWriteIdList);
}
+ }.run(false);
- Set<MColumnDescriptor> oldCds = new HashSet<>();
- Ref<MColumnDescriptor> oldCdRef = new Ref<>();
- for (Partition tmpPart : newParts) {
- if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
- throw new MetaException("Invalid DB name : " +
tmpPart.getDbName());
- }
-
- if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
- throw new MetaException("Invalid table name : " +
tmpPart.getDbName());
- }
-
- if (writeId > 0) {
- tmpPart.setWriteId(writeId);
- }
- oldCdRef.t = null;
- Partition result = alterPartitionNoTxn(catName, dbName, tblName,
mPartsMap.get(tmpPart.getValues()),
- tmpPart, queryWriteIdList, oldCdRef, table);
- results.add(result);
- if (oldCdRef.t != null) {
- oldCds.add(oldCdRef.t);
- }
- }
- for (MColumnDescriptor oldCd : oldCds) {
- removeUnusedColumnDescriptor(oldCd);
- }
- }
// commit the changes
success = commitTransaction();
} catch (Exception exception) {
- e = exception;
- LOG.error("Alter failed", e);
+ LOG.error("Alter failed", exception);
+ throw new MetaException(exception.getMessage());
} finally {
if (!success) {
rollbackTransaction();
- MetaException metaException = new MetaException(
- "The transaction for alter partition did not commit
successfully.");
- if (e != null) {
- metaException.initCause(e);
+ }
+ }
+ return results;
+ }
+
+ private List<Partition> alterPartitionsViaJdo(MTable table, List<String>
partNames,
+ List<Partition> newParts,
String queryWriteIdList)
+ throws MetaException, InvalidObjectException {
+ String catName = table.getDatabase().getCatalogName();
+ String dbName = table.getDatabase().getName();
+ String tblName = table.getTableName();
+ List<Partition> results = new ArrayList<>(newParts.size());
+ List<MPartition> mPartitionList;
+
+ try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartition.class,
+ "table.tableName == t1 && table.database.name == t2 &&
t3.contains(partitionName) " +
+ " && table.database.catalogName == t4"))) {
+ query.declareParameters("java.lang.String t1, java.lang.String t2,
java.util.Collection t3, "
+ + "java.lang.String t4");
+ mPartitionList = (List<MPartition>) query.executeWithArray(tblName,
dbName, partNames, catName);
+ pm.retrieveAll(mPartitionList);
+
+ if (mPartitionList.size() > newParts.size()) {
+ throw new MetaException("Expecting only one partition but more than
one partitions are found.");
+ }
+
+ Map<List<String>, MPartition> mPartsMap = new HashMap();
+ for (MPartition mPartition : mPartitionList) {
+ mPartsMap.put(mPartition.getValues(), mPartition);
+ }
+
+ Set<MColumnDescriptor> oldCds = new HashSet<>();
+ Ref<MColumnDescriptor> oldCdRef = new Ref<>();
+ for (Partition tmpPart : newParts) {
+ if (!tmpPart.getDbName().equalsIgnoreCase(dbName)) {
+ throw new MetaException("Invalid DB name : " + tmpPart.getDbName());
}
- throw metaException;
+
+ if (!tmpPart.getTableName().equalsIgnoreCase(tblName)) {
+ throw new MetaException("Invalid table name : " +
tmpPart.getDbName());
+ }
+
+ oldCdRef.t = null;
+ Partition result = alterPartitionNoTxn(catName, dbName, tblName,
+ mPartsMap.get(tmpPart.getValues()), tmpPart, queryWriteIdList,
oldCdRef, table);
+ results.add(result);
+ if (oldCdRef.t != null) {
+ oldCds.add(oldCdRef.t);
+ }
+ }
+ for (MColumnDescriptor oldCd : oldCds) {
+ removeUnusedColumnDescriptor(oldCd);
}
}
+
return results;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index e7e97b5f23d..f490798be56 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -680,4 +680,20 @@ public class TxnUtils {
return (SQLException)ex;
}
+ public static String createUpdatePreparedStmt(String tableName, List<String>
columnNames, List<String> conditionKeys) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("update " + tableName + " set ");
+ sb.append(columnNames.stream().map(col -> col +
"=?").collect(Collectors.joining(",")));
+ sb.append(" where " + conditionKeys.stream().map(cond -> cond +
"=?").collect(Collectors.joining(" and ")));
+ return sb.toString();
+ }
+
+ public static String createInsertPreparedStmt(String tableName, List<String>
columnNames) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("insert into " + tableName + "(");
+ sb.append(columnNames.stream().collect(Collectors.joining(",")));
+ String placeholder = columnNames.stream().map(col ->
"?").collect(Collectors.joining(","));
+ sb.append(") values (" + placeholder + ")");
+ return sb.toString();
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 940b18d1db4..a6e510e5c4d 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -503,7 +503,7 @@ public class TestObjectStore {
objectStore.alterPartitions(DEFAULT_CATALOG_NAME, DB1,
"not_existed_table", part_vals, parts, 0, "");
} catch (MetaException e) {
// expected
- Assert.assertTrue(e.getCause() instanceof NoSuchObjectException);
+ Assert.assertEquals(e.getMessage(), "Specified catalog.database.table
does not exist : hive.testobjectstoredb1.not_existed_table");
}
}
diff --git
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index 90672bf483d..f19576d6940 100644
---
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static
org.apache.hadoop.hive.metastore.tools.Constants.HMS_DEFAULT_PORT;
+import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkAlterPartitions;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartition;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartitions;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDeleteCreate;
@@ -311,6 +312,8 @@ public class BenchmarkTool implements Runnable {
() -> benchmarkCreatePartitions(bench, bData, howMany))
.add("dropPartitions" + '.' + howMany,
() -> benchmarkDropPartitions(bench, bData, howMany))
+ .add("alterPartitions" + '.' + howMany,
+ () -> benchmarkAlterPartitions(bench, bData, howMany))
.add("renameTable" + '.' + howMany,
() -> benchmarkRenameTable(bench, bData, howMany))
.add("dropDatabase" + '.' + howMany,
diff --git
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index 214e9e1cd6b..a2f97eb3170 100644
---
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.PartitionManagementTask;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
@@ -338,6 +339,35 @@ final class HMSBenchmarks {
}
}
+ static DescriptiveStatistics benchmarkAlterPartitions(@NotNull
MicroBenchmark bench,
+ @NotNull BenchData
data,
+ int count) {
+ final HMSClient client = data.getClient();
+ String dbName = data.dbName;
+ String tableName = data.tableName;
+
+ BenchmarkUtils.createPartitionedTable(client, dbName, tableName);
+ try {
+ return bench.measure(
+ () -> addManyPartitionsNoException(client, dbName, tableName, null,
+ Collections.singletonList("d"), count),
+ () -> throwingSupplierWrapper(() -> {
+ List<Partition> newPartitions = client.getPartitions(dbName,
tableName);
+ newPartitions.forEach(p -> {
+ p.getParameters().put("new_param", "param_val");
+ p.getSd().setCols(Arrays.asList(new FieldSchema("new_col",
"string", null)));
+ });
+ client.alterPartitions(dbName, tableName, newPartitions);
+ return null;
+ }),
+ () -> throwingSupplierWrapper(() ->
+ client.dropPartitions(dbName, tableName, null))
+ );
+ } finally {
+ throwingSupplierWrapper(() -> client.dropTable(dbName, tableName));
+ }
+ }
+
static DescriptiveStatistics benchmarkGetPartitionNames(@NotNull
MicroBenchmark bench,
@NotNull BenchData
data,
int count) {