asfgit closed pull request #1357: DRILL-6557: Use size in bytes during Hive 
statistics calculation if present
URL: https://github.com/apache/drill/pull/1357
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 6da6c40cc9..de45dc6eb4 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -19,8 +19,6 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.TreeMultimap;
@@ -50,12 +48,14 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Class which provides methods to get metadata of given Hive table selection. 
It tries to use the stats stored in
@@ -79,50 +79,52 @@
   public HiveMetadataProvider(final String userName, final HiveReadEntry 
hiveReadEntry, final HiveConf hiveConf) {
     this.hiveReadEntry = hiveReadEntry;
     this.ugi = ImpersonationUtil.createProxyUgi(userName);
-    isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
-    partitionInputSplitMap = Maps.newHashMap();
+    this.isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() 
> 0;
+    this.partitionInputSplitMap = new HashMap<>();
     this.hiveConf = hiveConf;
   }
 
   /**
-   * Return stats for table/partitions in given {@link HiveReadEntry}. If 
valid stats are available in MetaStore,
-   * return it. Otherwise estimate using the size of the input data.
+   * Return stats for table/partitions in given {@link HiveReadEntry}.
+   * If valid stats are available in MetaStore, return it.
+   * Otherwise estimate using the size of the input data.
    *
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when 
creating this cache object.
-   * @return
-   * @throws IOException
+   * @return hive statistics holder
+   * @throws IOException if was unable to retrieve table statistics
    */
   public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws 
IOException {
-    final Stopwatch timeGetStats = Stopwatch.createStarted();
+    Stopwatch timeGetStats = Stopwatch.createStarted();
 
-    final HiveTableWithColumnCache table = hiveReadEntry.getTable();
+    HiveTableWithColumnCache table = hiveReadEntry.getTable();
     try {
       if (!isPartitionedTable) {
-        final Properties properties = MetaStoreUtils.getTableMetadata(table);
-        final HiveStats stats = getStatsFromProps(properties);
+        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        HiveStats stats = HiveStats.getStatsFromProps(properties);
         if (stats.valid()) {
           return stats;
         }
 
-        // estimate the stats from the InputSplits.
-        return getStatsEstimateFromInputSplits(getTableInputSplits());
+        return stats.getSizeInBytes() > 0 ? 
estimateStatsFromBytes(stats.getSizeInBytes()) :
+            estimateStatsFromInputSplits(getTableInputSplits());
+
       } else {
-        final HiveStats aggStats = new HiveStats(0, 0);
-        for(HivePartition partition : hiveReadEntry.getPartitions()) {
-          final Properties properties = 
HiveUtilities.getPartitionMetadata(partition, table);
-          HiveStats stats = getStatsFromProps(properties);
+        HiveStats aggStats = new HiveStats(0, 0);
+        for (HivePartition partition : hiveReadEntry.getPartitions()) {
+          Properties properties = 
HiveUtilities.getPartitionMetadata(partition, table);
+          HiveStats stats = HiveStats.getStatsFromProps(properties);
 
           if (!stats.valid()) {
-            // estimate the stats from InputSplits
-            stats = 
getStatsEstimateFromInputSplits(getPartitionInputSplits(partition));
+            stats = stats.getSizeInBytes() > 0 ? 
estimateStatsFromBytes(stats.getSizeInBytes()) :
+                
estimateStatsFromInputSplits(getPartitionInputSplits(partition));
           }
           aggStats.add(stats);
         }
 
         return aggStats;
       }
-    } catch (final Exception e) {
-      throw new IOException("Failed to get numRows from HiveTable", e);
+    } catch (Exception e) {
+      throw new IOException("Failed to get number of rows and total size from 
HiveTable", e);
     } finally {
       logger.debug("Took {} µs to get stats from {}.{}", 
timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
           table.getDbName(), table.getTableName());
@@ -130,7 +132,7 @@ public HiveStats getStats(final HiveReadEntry 
hiveReadEntry) throws IOException
   }
 
   /** Helper method which return InputSplits for non-partitioned table */
-  private List<LogicalInputSplit> getTableInputSplits() throws Exception {
+  private List<LogicalInputSplit> getTableInputSplits() {
     Preconditions.checkState(!isPartitionedTable, "Works only for 
non-partitioned tables");
     if (tableInputSplits != null) {
       return tableInputSplits;
@@ -145,7 +147,7 @@ public HiveStats getStats(final HiveReadEntry 
hiveReadEntry) throws IOException
   /** Helper method which returns the InputSplits for given partition. 
InputSplits are cached to speed up subsequent
    * metadata cache requests for the same partition(s).
    */
-  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition 
partition) throws Exception {
+  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition 
partition) {
     if (partitionInputSplitMap.containsKey(partition)) {
       return partitionInputSplitMap.get(partition);
     }
@@ -164,18 +166,17 @@ public HiveStats getStats(final HiveReadEntry 
hiveReadEntry) throws IOException
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when 
creating this object.
    * @return list of logically grouped input splits
    */
-  public List<LogicalInputSplit> getInputSplits(final HiveReadEntry 
hiveReadEntry) {
-    final Stopwatch timeGetSplits = Stopwatch.createStarted();
+  public List<LogicalInputSplit> getInputSplits(HiveReadEntry hiveReadEntry) {
+    Stopwatch timeGetSplits = Stopwatch.createStarted();
     try {
       if (!isPartitionedTable) {
         return getTableInputSplits();
       }
 
-      final List<LogicalInputSplit> splits = Lists.newArrayList();
-      for (HivePartition p : hiveReadEntry.getPartitions()) {
-        splits.addAll(getPartitionInputSplits(p));
-      }
-      return splits;
+      return hiveReadEntry.getPartitions().stream()
+          .flatMap(p -> getPartitionInputSplits(p).stream())
+          .collect(Collectors.toList());
+
     } catch (final Exception e) {
       logger.error("Failed to get InputSplits", e);
       throw new DrillRuntimeException("Failed to get InputSplits", e);
@@ -190,63 +191,44 @@ public HiveStats getStats(final HiveReadEntry 
hiveReadEntry) throws IOException
    *
    * @param hiveReadEntry {@link HiveReadEntry} containing the input table 
and/or partitions.
    */
-  protected List<String> getInputDirectories(final HiveReadEntry 
hiveReadEntry) {
+  protected List<String> getInputDirectories(HiveReadEntry hiveReadEntry) {
     if (isPartitionedTable) {
-      final List<String> inputs = Lists.newArrayList();
-      for(Partition p : hiveReadEntry.getPartitions()) {
-        inputs.add(p.getSd().getLocation());
-      }
-      return inputs;
+      return hiveReadEntry.getPartitions().stream()
+          .map(p -> p.getSd().getLocation())
+          .collect(Collectors.toList());
     }
 
     return 
Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
   }
 
   /**
-   * Get the stats from table properties. If not found -1 is returned for each 
stats field.
-   * CAUTION: stats may not be up-to-date with the underlying data. It is 
always good to run the ANALYZE command on
-   * Hive table to have up-to-date stats.
+   * Estimate the stats from the given list of logically grouped input splits.
    *
-   * @param properties the source of table stats
-   * @return {@link HiveStats} instance with rows number and size in bytes 
from specified properties
+   * @param inputSplits list of logically grouped input splits
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsFromProps(final Properties properties) {
-    long numRows = -1;
-    long sizeInBytes = -1;
-    try {
-      final String numRowsProp = 
properties.getProperty(StatsSetupConst.ROW_COUNT);
-      if (numRowsProp != null) {
-          numRows = Long.valueOf(numRowsProp);
-      }
-
-      final String sizeInBytesProp = 
properties.getProperty(StatsSetupConst.TOTAL_SIZE);
-      if (sizeInBytesProp != null) {
-        sizeInBytes = Long.valueOf(sizeInBytesProp);
-      }
-    } catch (final NumberFormatException e) {
-      logger.error("Failed to parse Hive stats in metastore.", e);
-      // continue with the defaults.
+  private HiveStats estimateStatsFromInputSplits(List<LogicalInputSplit> 
inputSplits) throws IOException {
+    logger.trace("Collecting stats based on input splits size. " +
+        "It means that we might have fetched all input splits before applying 
any possible optimizations (ex: partition pruning). " +
+        "Consider using ANALYZE command on Hive table to collect statistics 
before running queries.");
+    long sizeInBytes = 0;
+    for (LogicalInputSplit split : inputSplits) {
+      sizeInBytes += split.getLength();
     }
-
-    return new HiveStats(numRows, sizeInBytes);
+    return estimateStatsFromBytes(sizeInBytes);
   }
 
   /**
-   * Estimate the stats from the given list of logically grouped input splits.
+   * Estimates Hive stats based on give size in bytes.
    *
-   * @param inputSplits list of logically grouped input splits
-   * @return hive stats usually numRows and totalSizeInBytes which used
+   * @param sizeInBytes size in bytes
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsEstimateFromInputSplits(final 
List<LogicalInputSplit> inputSplits) throws IOException {
-    long data = 0;
-    for (final LogicalInputSplit split : inputSplits) {
-      data += split.getLength();
-    }
-
-    long numRows = data / RECORD_SIZE;
+  private HiveStats estimateStatsFromBytes(long sizeInBytes) {
+    long numRows = sizeInBytes / RECORD_SIZE;
     // if the result of division is zero and data size > 0, estimate to one row
-    numRows = numRows == 0 && data > 0 ? 1 : numRows;
-    return new HiveStats(numRows, data);
+    numRows = numRows == 0 && sizeInBytes > 0 ? 1 : numRows;
+    return new HiveStats(numRows, sizeInBytes);
   }
 
   /**
@@ -262,36 +244,34 @@ private HiveStats getStatsEstimateFromInputSplits(final 
List<LogicalInputSplit>
   private List<LogicalInputSplit> splitInputWithUGI(final Properties 
properties, final StorageDescriptor sd, final Partition partition) {
     watch.start();
     try {
-      return ugi.doAs(new PrivilegedExceptionAction<List<LogicalInputSplit>>() 
{
-        public List<LogicalInputSplit> run() throws Exception {
-          final List<LogicalInputSplit> splits = Lists.newArrayList();
-          final JobConf job = new JobConf(hiveConf);
-          HiveUtilities.addConfToJob(job, properties);
-          HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
-          job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, 
hiveReadEntry.getTable()));
-          final Path path = new Path(sd.getLocation());
-          final FileSystem fs = path.getFileSystem(job);
-          if (fs.exists(path)) {
-            FileInputFormat.addInputPath(job, path);
-            final InputFormat<?, ?> format = job.getInputFormat();
-            InputSplit[] inputSplits = format.getSplits(job, 1);
-
-            // if current table with text input format and has header / footer,
-            // we need to make sure that splits of the same file are grouped 
together
-            if 
(TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
-                HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
-              Multimap<Path, FileSplit> inputSplitMultimap = 
transformFileSplits(inputSplits);
-              for (Collection<FileSplit> logicalInputSplit : 
inputSplitMultimap.asMap().values()) {
-                splits.add(new LogicalInputSplit(logicalInputSplit, 
partition));
-              }
-            } else {
-              for (final InputSplit split : inputSplits) {
-                splits.add(new LogicalInputSplit(split, partition));
-              }
+      return ugi.doAs((PrivilegedExceptionAction<List<LogicalInputSplit>>) () 
-> {
+        final List<LogicalInputSplit> splits = new ArrayList<>();
+        final JobConf job = new JobConf(hiveConf);
+        HiveUtilities.addConfToJob(job, properties);
+        HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, 
hiveReadEntry.getTable()));
+        final Path path = new Path(sd.getLocation());
+        final FileSystem fs = path.getFileSystem(job);
+        if (fs.exists(path)) {
+          FileInputFormat.addInputPath(job, path);
+          final InputFormat<?, ?> format = job.getInputFormat();
+          InputSplit[] inputSplits = format.getSplits(job, 1);
+
+          // if current table with text input format and has header / footer,
+          // we need to make sure that splits of the same file are grouped 
together
+          if 
(TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
+              HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
+            Multimap<Path, FileSplit> inputSplitMultimap = 
transformFileSplits(inputSplits);
+            for (Collection<FileSplit> logicalInputSplit : 
inputSplitMultimap.asMap().values()) {
+              splits.add(new LogicalInputSplit(logicalInputSplit, partition));
+            }
+          } else {
+            for (final InputSplit split : inputSplits) {
+              splits.add(new LogicalInputSplit(split, partition));
             }
           }
-          return splits;
         }
+        return splits;
       });
     } catch (final InterruptedException | IOException e) {
       final String errMsg = String.format("Failed to create input splits: %s", 
e.getMessage());
@@ -320,13 +300,8 @@ private HiveStats getStatsEstimateFromInputSplits(final 
List<LogicalInputSplit>
    * @return multimap where key is file path and value is group of ordered 
file splits
    */
   private Multimap<Path, FileSplit> transformFileSplits(InputSplit[] 
inputSplits) {
-    Multimap<Path, FileSplit> inputSplitGroups = 
TreeMultimap.create(Ordering.<Path>natural(),
-        new Comparator<FileSplit>() {
-      @Override
-      public int compare(FileSplit f1, FileSplit f2) {
-        return Long.compare(f1.getStart(), f2.getStart());
-      }
-    });
+    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(
+        Ordering.natural(), Comparator.comparingLong(FileSplit::getStart));
 
     for (InputSplit inputSplit : inputSplits) {
       FileSplit fileSplit = (FileSplit) inputSplit;
@@ -413,16 +388,53 @@ public String getType() {
     }
   }
 
-  /** Contains stats. Currently only numRows and totalSizeInBytes are used. */
+  /**
+   * Contains stats. Currently only numRows and totalSizeInBytes are used.
+   */
   public static class HiveStats {
+
+    private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveStats.class);
+
     private long numRows;
     private long sizeInBytes;
 
-    public HiveStats(final long numRows, final long sizeInBytes) {
+    public HiveStats(long numRows, long sizeInBytes) {
       this.numRows = numRows;
       this.sizeInBytes = sizeInBytes;
     }
 
+    /**
+     * Get the stats from table properties. If not found -1 is returned for 
each stats field.
+     * CAUTION: stats may not be up-to-date with the underlying data. It is 
always good to run the ANALYZE command on
+     * Hive table to have up-to-date stats.
+     *
+     * @param properties the source of table stats
+     * @return {@link HiveStats} instance with rows number and size in bytes 
from specified properties
+     */
+    public static HiveStats getStatsFromProps(Properties properties) {
+      long numRows = -1;
+      long sizeInBytes = -1;
+      try {
+        String sizeInBytesProp = 
properties.getProperty(StatsSetupConst.TOTAL_SIZE);
+        if (sizeInBytesProp != null) {
+          sizeInBytes = Long.valueOf(sizeInBytesProp);
+        }
+
+        String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
+        if (numRowsProp != null) {
+          numRows = Long.valueOf(numRowsProp);
+        }
+      } catch (NumberFormatException e) {
+        logger.error("Failed to parse Hive stats from metastore.", e);
+        // continue with the defaults.
+      }
+
+      HiveStats hiveStats = new HiveStats(numRows, sizeInBytes);
+      logger.trace("Obtained Hive stats from properties: {}.", hiveStats);
+      return hiveStats;
+    }
+
+
     public long getNumRows() {
       return numRows;
     }
@@ -431,7 +443,9 @@ public long getSizeInBytes() {
       return sizeInBytes;
     }
 
-    /** Both numRows and sizeInBytes are expected to be greater than 0 for 
stats to be valid */
+    /**
+     * Both numRows and sizeInBytes are expected to be greater than 0 for 
stats to be valid
+     */
     public boolean valid() {
       return numRows > 0 && sizeInBytes > 0;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to