This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5a4160e5de2 HIVE-24313: Optimise stats collection for file sizes on
cloud storage (Dmitriy Fingerman, reviewed by Krisztian Kasa)
5a4160e5de2 is described below
commit 5a4160e5de2314d77472b72a6846f36ce08bec6e
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Thu Oct 6 08:03:37 2022 -0400
HIVE-24313: Optimise stats collection for file sizes on cloud storage
(Dmitriy Fingerman, reviewed by Krisztian Kasa)
Co-authored-by: Dmitriy Fingerman <[email protected]>
---
.../apache/hadoop/hive/ql/stats/StatsUtils.java | 31 +++++++++++++++++-----
.../ql/util/NamedForkJoinWorkerThreadFactory.java | 2 +-
2 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
index f493bfebc6c..babb5dbc330 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
@@ -35,9 +35,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.NDV;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.ql.util.NamedForkJoinWorkerThreadFactory;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -120,7 +122,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.common.util.AnnotationUtils;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,6 +146,17 @@ public class StatsUtils {
// Range upper limit for timestamp type when not defined (seconds,
heuristic): '2024-12-31 23:59:59'
private static final long TIMESTAMP_RANGE_UPPER_LIMIT = 1735689599L;
+ private static final ForkJoinPool statsForkJoinPool = new ForkJoinPool(
+ Runtime.getRuntime().availableProcessors(),
+ new NamedForkJoinWorkerThreadFactory("basic-stats-"),
+ getUncaughtExceptionHandler(),
+ false
+ );
+
+ private static Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()
{
+ return (t, e) -> LOG.error(String.format("Thread %s exited with error",
t.getName()), e);
+ }
+
/**
* Collect table, partition and column level statistics
* @param conf
@@ -317,12 +329,17 @@ public class StatsUtils {
basicStatsFactory.addEnhancer(new
BasicStats.RowNumEstimator(estimateRowSizeFromSchema(conf, schema)));
- List<BasicStats> partStats = new ArrayList<>();
-
- for (Partition p : partList.getNotDeniedPartns()) {
- BasicStats basicStats =
basicStatsFactory.build(Partish.buildFor(table, p));
- partStats.add(basicStats);
+ List<BasicStats> partStats = null;
+ try {
+ partStats = statsForkJoinPool.submit(() ->
+ partList.getNotDeniedPartns().parallelStream().
+ map(p -> basicStatsFactory.build(Partish.buildFor(table,
p))).
+ collect(Collectors.toList())
+ ).get();
+ } catch (Exception e) {
+ throw new HiveException(e);
}
+
BasicStats bbs = BasicStats.buildFrom(partStats);
long nr = bbs.getNumRows();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
index 5b6eecc946e..7b982913743 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ForkJoinWorkerThread;
*/
public class NamedForkJoinWorkerThreadFactory implements
ForkJoinPool.ForkJoinWorkerThreadFactory {
- NamedForkJoinWorkerThreadFactory(String namePrefix) {
+ public NamedForkJoinWorkerThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}