This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4160e5de2 HIVE-24313: Optimise stats collection for file sizes on 
cloud storage (Dmitriy Fingerman, reviewed by Krisztian Kasa)
5a4160e5de2 is described below

commit 5a4160e5de2314d77472b72a6846f36ce08bec6e
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Thu Oct 6 08:03:37 2022 -0400

    HIVE-24313: Optimise stats collection for file sizes on cloud storage 
(Dmitriy Fingerman, reviewed by Krisztian Kasa)
    
    Co-authored-by: Dmitriy Fingerman <[email protected]>
---
 .../apache/hadoop/hive/ql/stats/StatsUtils.java    | 31 +++++++++++++++++-----
 .../ql/util/NamedForkJoinWorkerThreadFactory.java  |  2 +-
 2 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
index f493bfebc6c..babb5dbc330 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
@@ -35,9 +35,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.NDV;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.ql.util.NamedForkJoinWorkerThreadFactory;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -120,7 +122,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.common.util.AnnotationUtils;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,6 +146,17 @@ public class StatsUtils {
   // Range upper limit for timestamp type when not defined (seconds, 
heuristic): '2024-12-31 23:59:59'
   private static final long TIMESTAMP_RANGE_UPPER_LIMIT = 1735689599L;
 
+  private static final ForkJoinPool statsForkJoinPool = new ForkJoinPool(
+          Runtime.getRuntime().availableProcessors(),
+          new NamedForkJoinWorkerThreadFactory("basic-stats-"),
+          getUncaughtExceptionHandler(),
+          false
+  );
+
+  private static Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() 
{
+    return (t, e) -> LOG.error(String.format("Thread %s exited with error", 
t.getName()), e);
+  }
+
   /**
    * Collect table, partition and column level statistics
    * @param conf
@@ -317,12 +329,17 @@ public class StatsUtils {
 
       basicStatsFactory.addEnhancer(new 
BasicStats.RowNumEstimator(estimateRowSizeFromSchema(conf, schema)));
 
-      List<BasicStats> partStats = new ArrayList<>();
-
-      for (Partition p : partList.getNotDeniedPartns()) {
-        BasicStats basicStats = 
basicStatsFactory.build(Partish.buildFor(table, p));
-        partStats.add(basicStats);
+      List<BasicStats> partStats = null;
+      try {
+        partStats = statsForkJoinPool.submit(() ->
+          partList.getNotDeniedPartns().parallelStream().
+                  map(p -> basicStatsFactory.build(Partish.buildFor(table, 
p))).
+                  collect(Collectors.toList())
+        ).get();
+      } catch (Exception e) {
+        throw new HiveException(e);
       }
+
       BasicStats bbs = BasicStats.buildFrom(partStats);
 
       long nr = bbs.getNumRows();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
index 5b6eecc946e..7b982913743 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/util/NamedForkJoinWorkerThreadFactory.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ForkJoinWorkerThread;
  */
 public class NamedForkJoinWorkerThreadFactory implements 
ForkJoinPool.ForkJoinWorkerThreadFactory {
 
-  NamedForkJoinWorkerThreadFactory(String namePrefix) {
+  public NamedForkJoinWorkerThreadFactory(String namePrefix) {
     this.namePrefix = namePrefix;
   }
 

Reply via email to