This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebaa68af67 HIVE-28813: MSCK/Analyze commands can show a warning in
console for small files (#5950)
1ebaa68af67 is described below
commit 1ebaa68af67ee094bfd59102bf7aebe4832a9725
Author: Daniel (Hongdan) Zhu <[email protected]>
AuthorDate: Sat Oct 25 21:39:01 2025 -0700
HIVE-28813: MSCK/Analyze commands can show a warning in console for small
files (#5950)
---
.../hive/ql/ddl/misc/msck/MsckOperation.java | 26 +++++-
.../org/apache/hadoop/hive/ql/exec/StatsTask.java | 1 -
.../hadoop/hive/ql/stats/BasicStatsNoJobTask.java | 30 +++++-
.../hadoop/hive/ql/stats/BasicStatsTask.java | 21 +++++
.../apache/hadoop/hive/metastore/CheckResult.java | 12 +++
.../hadoop/hive/metastore/conf/MetastoreConf.java | 4 +
.../metastore/utils/SmallFilesWarningUtil.java | 103 +++++++++++++++++++++
.../hive/metastore/HiveMetaStoreChecker.java | 3 -
.../org/apache/hadoop/hive/metastore/Msck.java | 70 ++++++++++++++
.../org/apache/hadoop/hive/metastore/MsckInfo.java | 11 +++
10 files changed, 275 insertions(+), 6 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java
index e7e39021d88..b8ed7bb36c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java
@@ -21,6 +21,9 @@
import static org.apache.hadoop.hive.metastore.Msck.getProxyClass;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -75,7 +78,27 @@ public int execute() throws HiveException, IOException,
TException, MetastoreExc
MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(),
tableName.getDb(), tableName.getTable(),
desc.getFilterExp(), desc.getResFile(), desc.isRepairPartitions(),
desc.isAddPartitions(), desc.isDropPartitions(),
partitionExpirySeconds);
- return msck.repair(msckInfo);
+ int result = msck.repair(msckInfo);
+ Map<String, String> smallFilesStats = msckInfo.getSmallFilesStats();
+ if (smallFilesStats != null && !smallFilesStats.isEmpty()) {
+ // keep the small files information in logInfo
+ List<String> logInfo = smallFilesStats.entrySet().stream()
+ .map(entry -> String.format(
+ "Average file size is too small, small files exist. %n
Partition name: %s. %s",
+ entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ // print out the small files information on console to end users
+ SessionState ss = SessionState.get();
+ if (ss != null && ss.getConsole() != null) {
+ ss.getConsole().printInfo("[MSCK] Small files detected.");
+ ss.getConsole().printInfo(""); // add a blank line for separation
+ logInfo.forEach(line -> ss.getConsole().printInfo("[MSCK] " + line));
+ } else {
+ // if there is no console to print out, keep the small files info in
logs
+ LOG.info("There are small files exist.\n{}", String.join("\n",
logInfo));
+ }
+ }
+ return result;
} catch (MetaException | MetastoreException e) {
LOG.error("Unable to create msck instance.", e);
throw e;
@@ -84,4 +107,5 @@ public int execute() throws HiveException, IOException,
TException, MetastoreExc
return 1;
}
}
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 96ae3e0d701..17b9410ea81 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -124,7 +124,6 @@ public int execute() {
return 0;
}
-
private Table getTable(Hive db) throws SemanticException, HiveException {
return db.getTable(work.getFullTableName());
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
index 2ab41981ed6..defcd586098 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.utils.SmallFilesWarningUtil;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -54,6 +55,7 @@
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.plan.BasicStatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
@@ -140,6 +142,7 @@ abstract static class StatCollector implements Runnable {
protected Partish partish;
protected Object result;
protected LogHelper console;
+ protected HiveConf conf;
public static Function<StatCollector, String> SIMPLE_NAME_FUNCTION =
sc -> String.format("%s#%s", sc.partish.getTable().getCompleteName(),
sc.partish.getPartishType());
@@ -148,6 +151,7 @@ abstract static class StatCollector implements Runnable {
protected void init(HiveConf conf, LogHelper console) throws IOException {
this.console = console;
+ this.conf = conf;
}
protected final boolean isValid() {
@@ -181,6 +185,18 @@ public void run() {
}
parameters.putAll(basicStatistics);
StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+
+ String who = (partish.getPartition() == null) ? ("table " +
partish.getTable().getFullyQualifiedName())
+ : ("partition " + partish.getPartition().getName());
+ long threshold =
conf.getLongVar(HiveConf.ConfVars.HIVE_MERGE_MAP_FILES_AVG_SIZE);
+ SmallFilesWarningUtil.smallFilesWarnings(parameters, 100L, threshold,
who, "[ANALYZE][NOSCAN]")
+ .ifPresent(msg -> {
+ LOG.warn(msg);
+ SessionState ss = SessionState.get();
+ if (ss != null && ss.getConsole() != null) {
+ ss.getConsole().printInfo(msg);
+ }
+ });
String msg = partish.getSimpleName() + " stats: [" +
toString(parameters) + ']';
LOG.debug(msg);
console.printInfo(msg);
@@ -206,6 +222,7 @@ public FooterStatCollector(JobConf jc, Partish partish) {
@Override
public void init(HiveConf conf, LogHelper console) throws IOException {
this.console = console;
+ this.conf = conf;
dir = new Path(partish.getPartSd().getLocation());
fs = dir.getFileSystem(conf);
}
@@ -293,6 +310,18 @@ public void run() {
parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
parameters.put(StatsSetupConst.NUM_ERASURE_CODED_FILES,
String.valueOf(numErasureCodedFiles));
+ String who = (partish.getPartition() == null) ? ("table " +
partish.getTable().getFullyQualifiedName())
+ : ("partition " + partish.getPartition().getName());
+ long threshold =
conf.getLongVar(HiveConf.ConfVars.HIVE_MERGE_MAP_FILES_AVG_SIZE);
+ SmallFilesWarningUtil.smallFilesWarnings(parameters, 100L, threshold,
who, "[ANALYZE][NOSCAN]")
+ .ifPresent(msg -> {
+ LOG.warn(msg);
+ SessionState ss = SessionState.get();
+ if (ss != null && ss.getConsole() != null) {
+ ss.getConsole().printInfo(msg);
+ }
+ });
+
if (partish.getPartition() != null) {
result = new Partition(partish.getTable(),
partish.getPartition().getTPartition());
} else {
@@ -307,7 +336,6 @@ public void run() {
console.printInfo("[Warning] could not update stats for " +
partish.getSimpleName() + ".", "Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e));
}
}
-
}
private Collection<Partition> getPartitions(Table table) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
index 085954a1676..d5f44cbea2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.SmallFilesWarningUtil;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -55,6 +56,7 @@
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -128,6 +130,8 @@ private static class BasicStatsProcessor {
private Map<String, String> providedBasicStats;
private boolean skipStatsUpdate = false;
+ private HiveConf conf;
+
public BasicStatsProcessor(Partish partish, BasicStatsWork work, boolean
followedColStats2) {
this.partish = partish;
this.work = work;
@@ -182,6 +186,22 @@ public Object process(StatsAggregator statsAggregator)
throws HiveException, Met
parameters.putAll(providedBasicStats);
}
+
+ final long threshold = (conf != null)
+ ?
conf.getLongVar(HiveConf.ConfVars.HIVE_MERGE_MAP_FILES_AVG_SIZE)
+ : HiveConf.ConfVars.HIVE_MERGE_MAP_FILES_AVG_SIZE.defaultLongVal;
+ final String who = (p.getPartition() == null)
+ ? "table " + p.getTable().getFullyQualifiedName()
+ : "partition " + p.getPartition().getName();
+ SmallFilesWarningUtil.smallFilesWarnings(parameters, 100L, threshold,
who, "[ANALYZE]")
+ .ifPresent(msg -> {
+ LOG.warn(msg);
+ SessionState ss = SessionState.get();
+ if (ss != null && ss.getConsole() != null) {
+ ss.getConsole().printInfo(msg);
+ }
+ });
+
if (statsAggregator != null && !skipStatsUpdate) {
// Update stats for transactional tables (MM, or full ACID with
overwrite), even
// though we are marking stats as not being accurate.
@@ -195,6 +215,7 @@ public Object process(StatsAggregator statsAggregator)
throws HiveException, Met
}
public void collectFileStatus(Warehouse wh, HiveConf conf) throws
MetaException, IOException {
+ this.conf = conf;
if (providedBasicStats == null) {
if (!partish.isTransactionalTable()) {
partfileStatus = wh.getFileStatusesForSD(partish.getPartSd());
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
index 9807138c117..90184b64c1b 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -36,6 +37,9 @@ public class CheckResult {
private Set<PartitionResult> partitionsNotInMs = new TreeSet<>();
private Set<PartitionResult> expiredPartitions = new TreeSet<>();
private Set<PartitionResult> correctPartitions = new TreeSet<>();
+
+ private Map<String, String> smallFilesStats = new HashMap<>();
+
private long maxWriteId;
private long maxTxnId;
@@ -115,6 +119,14 @@ public void setCorrectPartitions(final
Set<PartitionResult> correctPartitions) {
this.correctPartitions = correctPartitions;
}
+ public Map<String, String> getSmallFilesStats() {
+ return this.smallFilesStats;
+ }
+
+ public void setSmallFilesStats(Map<String, String> smallFilesStats) {
+ this.smallFilesStats = smallFilesStats;
+ }
+
public long getMaxWriteId() {
return maxWriteId;
}
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 63a0d106d27..3de70194477 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1811,6 +1811,10 @@ public enum ConfVars {
"A ZooKeeper instance must be up and running when using zookeeper
Hive lock manager "),
HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", "hive.txn.stats.enabled",
true,
"Whether Hive supports transactional stats (accurate stats for
transactional tables)"),
+ MSCK_SMALLFILES_AVG_SIZE("metastore.msck.smallfiles.avgsize",
"metastore.msck.smallfiles.avgsize", (long) (16 * 1000 * 1000),
+ "When the average files size of a table/partition is less than
this number, in msck command process, if total number\n" +
+ "of files is greater than 100, the small files warnings
will be shown to the end users in console, and\n" +
+ "also recorded in the logs."),
// External RDBMS support
USE_CUSTOM_RDBMS("metastore.use.custom.database.product",
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SmallFilesWarningUtil.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SmallFilesWarningUtil.java
new file mode 100644
index 00000000000..81cc402f9e8
--- /dev/null
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SmallFilesWarningUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+public final class SmallFilesWarningUtil {
+ private SmallFilesWarningUtil() {}
+
+ /** Default minimum number of files before we consider emitting the
warning. */
+ public static final long DEFAULT_MIN_FILES = 100L;
+
+ /**
+ * True if quick HMS stats in {@code parameters} indicate small average
file size.
+ * Missing/malformed stats or insufficient file count return false.
+ */
+ public static boolean smallAverageFilesDetected(long minFiles,
+ long avgSizeThreshold,
+ Map<String, String>
parameters) {
+ if (parameters == null) return false;
+
+ final String ts = parameters.get(StatsSetupConst.TOTAL_SIZE);
+ final String nf = parameters.get(StatsSetupConst.NUM_FILES);
+ if (ts == null || nf == null) return false;
+
+ final long totalSize;
+ final long numFiles;
+ try {
+ totalSize = Long.parseLong(ts);
+ numFiles = Long.parseLong(nf);
+ } catch (NumberFormatException ignore) {
+ return false;
+ }
+
+ if (numFiles <= minFiles || totalSize <= 0) return false;
+
+ final long avg = Math.floorDiv(totalSize, numFiles);
+ return avg <= avgSizeThreshold;
+ }
+
+ /**
+ * Convenience overload: reads the threshold from {@code conf} and uses
{@link #DEFAULT_MIN_FILES}.
+ * Returns false if {@code conf} is null.
+ */
+ public static boolean smallAverageFilesDetected(Configuration conf,
+ Map<String, String>
parameters) {
+ final long threshold = MetastoreConf.getLongVar(
+ conf, MetastoreConf.ConfVars.MSCK_SMALLFILES_AVG_SIZE);
+ return smallAverageFilesDetected(DEFAULT_MIN_FILES, threshold,
parameters);
+ }
+
+ /**
+ * Returns a formatted warning message if params indicate "small files",
else Optional.empty().
+ *
+ * @param parameters table/partition parameters
+ * @param minFiles e.g. 100
+ * @param avgSizeThreshold bytes, e.g. from conf
+ * @param tableOrPartName preformatted table or partition name
+ * @param tagPrefix e.g. "[ANALYZE]" or "[ANALYZE][NOSCAN]"
+ */
+ public static Optional<String> smallFilesWarnings(Map<String, String>
parameters,
+ long minFiles,
+ long avgSizeThreshold,
+ String tableOrPartName,
+ String tagPrefix) {
+ if (!smallAverageFilesDetected(minFiles, avgSizeThreshold,
parameters)) {
+ return Optional.empty();
+ }
+
+ // At this point we know parsing succeeds; reparse to build the
message.
+ final long totalSize =
Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE));
+ final long numFiles =
Long.parseLong(parameters.get(StatsSetupConst.NUM_FILES));
+ final long avg = Math.floorDiv(totalSize, numFiles);
+
+ final String prefix = (tagPrefix == null || tagPrefix.isEmpty()) ? ""
: (tagPrefix + " ");
+ final String msg = String.format(
+ "%sSmall files detected: %s (avgBytes=%d, files=%d,
totalBytes=%d)",
+ prefix, tableOrPartName, avg, numFiles, totalSize);
+
+ return Optional.of(msg);
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
index e5a85783e0f..11dc51b827a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
@@ -22,7 +22,6 @@
import static org.apache.hadoop.hive.common.AcidConstants.DELTA_PREFIX;
import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PREFIX;
import static
org.apache.hadoop.hive.metastore.PartFilterExprUtil.createExpressionProxy;
-import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getAllPartitionsOf;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getDataLocation;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartColNames;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartCols;
@@ -60,7 +59,6 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -359,7 +357,6 @@ void checkTable(Table table, PartitionIterable parts,
byte[] filterExp, CheckRes
continue;
}
fs = partPath.getFileSystem(conf);
-
CheckResult.PartitionResult prFromMetastore = new
CheckResult.PartitionResult();
prFromMetastore.setPartitionName(getPartitionName(table, partition));
prFromMetastore.setTableName(partition.getTableName());
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
index 1e9c9d9b134..2bdae8902ce 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
@@ -26,24 +26,32 @@
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -52,6 +60,7 @@
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
+import org.apache.hadoop.hive.metastore.utils.SmallFilesWarningUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -240,6 +249,67 @@ public int repair(MsckInfo msckInfo) throws TException,
MetastoreException, IOEx
}
}
+ // Generate small files warnings only for partsNotInMs
+ try {
+ // Collect partition names
+ final List<String> names = partsNotInMs.stream()
+ .map(CheckResult.PartitionResult::getPartitionName)
+ .filter(Objects::nonNull)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+
+ int partFetchBatch = MetastoreConf.getIntVar(getConf(),
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+ if (partFetchBatch <= 0) {
+ partFetchBatch = Batchable.NO_BATCHING;
+ }
+ final Map<String, Partition> byName = new HashMap<>(names.size() *
2);
+ List<Partition> allParts = Batchable.runBatched(
+ partFetchBatch,
+ names,
+ new Batchable<String, Partition>() {
+ @Override
+ public List<Partition> run(List<String> batch) throws Exception {
+ final GetPartitionsByNamesRequest req =
+ new GetPartitionsByNamesRequest(table.getDbName(),
table.getTableName());
+ req.setNames(batch);
+ try {
+ GetPartitionsByNamesResult res =
getMsc().getPartitionsByNames(req);
+ List<Partition> plist = (res != null && res.getPartitions()
!= null) ? res.getPartitions() : Collections.emptyList();
+ return plist;
+ } catch (NoSuchObjectException e) {
+ return Collections.emptyList();
+ }
+ }
+ });
+
+ for (Partition p : allParts) {
+ final String pName =
Warehouse.makePartName(table.getPartitionKeys(), p.getValues());
+ byName.put(pName, p);
+ }
+
+ // Build small-files stats for partitions that have quick HMS stats.
+ final Map<String, String> smallFilesStats = new TreeMap<>();
+
+ for (String pName : names) {
+ final Partition p = byName.get(pName);
+ final Map<String, String> params = p.getParameters();
+ // Use util to decide if this partition should trigger the warning
+ if (SmallFilesWarningUtil.smallAverageFilesDetected(conf, params))
{
+ // We know stats exist and are well-formed; compute numbers for
the value string
+ final long totalSize =
Long.parseLong(params.get(StatsSetupConst.TOTAL_SIZE));
+ final long numFiles =
Long.parseLong(params.get(StatsSetupConst.NUM_FILES));
+ final long avg = Math.floorDiv(totalSize, numFiles);
+ smallFilesStats.put(pName, "avgBytes=" + avg + ", partition
total files=" + numFiles + ", totalBytes=" + totalSize);
+ }
+ }
+ msckInfo.setSmallFilesStats(smallFilesStats);
+ } catch (Exception e) {
+ // MSCK repair should continue regardless of small-files warnings
outcome
+ LOG.warn("MSCK small-files post-add check failed: {}", e.toString());
+ msckInfo.setSmallFilesStats(Collections.emptyMap());
+ }
+
+
if (msckInfo.isDropPartitions() && (!partsNotInFs.isEmpty() ||
!expiredPartitions.isEmpty())) {
// MSCK called to drop stale paritions from metastore and there are
// stale partitions.
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
index 98cd45e888f..89092fc64e7 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.metastore;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Metadata related to Msck.
*/
@@ -33,6 +36,8 @@ public class MsckInfo {
private final boolean dropPartitions;
private final long partitionExpirySeconds;
+ private Map<String, String> smallFilesStats = new HashMap<>();
+
public MsckInfo(String catalogName, String dbName, String tableName, byte[]
filterExp, String resFile,
boolean repairPartitions, boolean addPartitions,
boolean dropPartitions, long partitionExpirySeconds) {
@@ -82,4 +87,10 @@ public boolean isDropPartitions() {
public long getPartitionExpirySeconds() {
return partitionExpirySeconds;
}
+
+ public Map<String, String> getSmallFilesStats() { return smallFilesStats; }
+
+ public void setSmallFilesStats(Map<String, String> smallFilesStats){
+ this.smallFilesStats = smallFilesStats;
+ }
}