Copilot commented on code in PR #6198:
URL: https://github.com/apache/hive/pull/6198#discussion_r2985857346
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -4612,7 +4615,7 @@ public boolean
delete_column_statistics_req(DeleteColumnStatisticsRequest req) t
if (ret) {
eventType = EventType.DELETE_TABLE_COLUMN_STAT;
for (String colName :
- colNames == null ?
table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList())
: colNames) {
+ colNames == null ?
table.getSd().getCols().stream().map(FieldSchema::getName).toList() : colNames)
{
Review Comment:
`colNames` can be an empty list (e.g., if a client explicitly sets
`col_names` to `[]`). In that case this logic will emit **no**
DELETE_*_COLUMN_STAT events even though the store layer interprets an empty
list as “delete all columns”. Consider using `colNames == null ||
colNames.isEmpty()` when deciding whether to expand to all table columns for
event emission.
```suggestion
(colNames == null || colNames.isEmpty())
?
table.getSd().getCols().stream().map(FieldSchema::getName).toList()
: colNames) {
```
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/directsql/DirectSqlDeleteStats.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.directsql;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.datastore.JDOConnection;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.Batchable;
+import org.apache.hadoop.hive.metastore.QueryWrapper;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.hadoop.hive.metastore.directsql.MetaStoreDirectSql.getIdListForIn;
+import static
org.apache.hadoop.hive.metastore.directsql.MetastoreDirectSqlUtils.closeDbConn;
+import static
org.apache.hadoop.hive.metastore.directsql.MetastoreDirectSqlUtils.executeWithArray;
+import static
org.apache.hadoop.hive.metastore.directsql.MetastoreDirectSqlUtils.extractSqlClob;
+import static
org.apache.hadoop.hive.metastore.directsql.MetastoreDirectSqlUtils.makeParams;
+
+public class DirectSqlDeleteStats {
+ private static final Logger LOG =
LoggerFactory.getLogger(DirectSqlDeleteStats.class);
+ private final MetaStoreDirectSql directSql;
+ private final PersistenceManager pm;
+ private final int batchSize;
+
+ public DirectSqlDeleteStats(MetaStoreDirectSql directSql, PersistenceManager
pm) {
+ this.directSql = directSql;
+ this.pm = pm;
+ this.batchSize = directSql.getDirectSqlBatchSize();
+ }
+
+ public boolean deletePartitionColumnStats(String catName, String dbName,
String tblName,
+ List<String> partNames, List<String> colNames, String engine) throws
MetaException {
+ List<Long> partIds = Batchable.runBatched(batchSize, partNames, new
Batchable<String, Long>() {
+ @Override
+ public List<Long> run(List<String> input) throws Exception {
+ String sqlFilter = "\"PARTITIONS\".\"PART_NAME\" in (" +
makeParams(input.size()) + ")";
+ List<Long> partitionIds =
directSql.getPartitionIdsViaSqlFilter(catName, dbName, tblName, sqlFilter,
+ input, Collections.emptyList(), -1);
+ if (!partitionIds.isEmpty()) {
+ String deleteSql = "delete from \"PART_COL_STATS\" where \"PART_ID\"
in ( " + getIdListForIn(partitionIds) + ")";
+ List<Object> params = new ArrayList<>(colNames == null ? 1 :
colNames.size() + 1);
+
+ if (colNames != null && !colNames.isEmpty()) {
+ deleteSql += " and \"COLUMN_NAME\" in (" +
makeParams(colNames.size()) + ")";
+ params.addAll(colNames);
+ }
+
+ if (engine != null) {
+ deleteSql += " and \"ENGINE\" = ?";
+ params.add(engine);
+ }
+ try (QueryWrapper queryParams = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", deleteSql))) {
+ executeWithArray(queryParams.getInnerQuery(), params.toArray(),
deleteSql);
+ }
+ }
+ return partitionIds;
+ }
+ });
+ try {
+ return updateColumnStatsAccurateForPartitions(partIds, colNames);
+ } catch (SQLException e) {
+ String errorMsg = String.format(
+ "Failed to update partitions' COLUMN_STATS_ACCURATE for catalog
'%s', database '%s', table '%s' "
+ + "(partitionCount=%d, columnCount=%d)",
+ catName, dbName, tblName,
+ partIds == null ? 0 : partIds.size(),
+ colNames == null ? 0 : colNames.size());
+ LOG.error(errorMsg, e);
+ MetaException metaException = new MetaException(errorMsg + ": " +
e.getMessage());
+ metaException.initCause(e);
+ throw metaException;
+ }
+ }
+
+ /**
+ * A helper function which will get the current COLUMN_STATS_ACCURATE
parameter on table level
+ * and update the COLUMN_STATS_ACCURATE parameter with the new value on
table level using directSql
+ */
+ private long updateColumnStatsAccurateForTable(Table table, List<String>
droppedCols) throws MetaException {
+ Map<String, String> params = table.getParameters();
+ // get the current COLUMN_STATS_ACCURATE
+ String currentValue;
+ if (params == null || (currentValue =
params.get(StatsSetupConst.COLUMN_STATS_ACCURATE)) == null) {
+ return 0;
+ }
+ // if the dropping columns is empty, that means we delete all the columns
+ if (droppedCols == null || droppedCols.isEmpty()) {
+ StatsSetupConst.clearColumnStatsState(params);
+ } else {
+ StatsSetupConst.removeColumnStatsState(params, droppedCols);
+ }
+
+ String updatedValue = params.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
+ // if the COL_STATS_ACCURATE has changed, then update it using directSql
+ if (currentValue.equals(updatedValue)) {
+ return 0;
+ }
+ return directSql.updateTableParam(table,
StatsSetupConst.COLUMN_STATS_ACCURATE, currentValue, updatedValue);
+ }
+
+ private boolean updateColumnStatsAccurateForPartitions(List<Long> partIds,
List<String> colNames)
+ throws MetaException, SQLException {
+ // Get the list of params that need to be updated
+ List<Pair<Long, String>> updates = getPartColAccuToUpdate(partIds,
colNames);
+ if (updates.isEmpty()) {
+ // Nothing to update: treat as successful completion
+ return true;
+ }
+ JDOConnection jdoConn = null;
+ try {
+ jdoConn = pm.getDataStoreConnection();
+ Connection dbConn = (Connection) jdoConn.getNativeConnection();
+ String update = "UPDATE \"PARTITION_PARAMS\" SET " + " \"PARAM_VALUE\" =
?" +
+ " WHERE \"PART_ID\" = ? AND \"PARAM_KEY\" = ?";
+ try (PreparedStatement pst = dbConn.prepareStatement(update)) {
+ List<Long> updated = new ArrayList<>();
+ for (Pair<Long, String> accurate : updates) {
+ pst.setString(1, accurate.getRight());
+ pst.setLong(2, accurate.getLeft());
+ pst.setString(3, StatsSetupConst.COLUMN_STATS_ACCURATE);
+ pst.addBatch();
+ updated.add(accurate.getLeft());
+ if (updated.size() == batchSize) {
+ LOG.debug("Execute updates on part: {}", updated);
+ verifyUpdates(pst.executeBatch(), updated);
+ updated = new ArrayList<>();
+ }
Review Comment:
`batchSize` can be `NO_BATCHING (-1)` (see MetaStoreDirectSql), in which
case `updated.size() == batchSize` will never be true and this loop will
accumulate an unbounded JDBC batch until the end. For large partition lists
this can cause high memory usage or driver limits. Consider treating `batchSize
<= 0` as a safe flush threshold (e.g., execute batches at a fixed max size) or
executing each update immediately when batching is disabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]