Repository: drill
Updated Branches:
  refs/heads/master 401119f3b -> 30ba97c17


DRILL-3884: Fix lower parallelization issues with Hive's native scan.

This closes #185


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/30ba97c1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/30ba97c1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/30ba97c1

Branch: refs/heads/master
Commit: 30ba97c17b533b8ed980f80deadbf9a52a87a659
Parents: 401119f
Author: vkorukanti <venki.koruka...@gmail.com>
Authored: Thu Oct 1 15:15:12 2015 -0700
Committer: vkorukanti <venki.koruka...@gmail.com>
Committed: Fri Oct 2 00:09:28 2015 -0700

----------------------------------------------------------------------
 .../store/hive/HiveDrillNativeParquetScan.java  |  9 ++++---
 .../apache/drill/exec/store/hive/HiveScan.java  | 27 +++++++++++++-------
 .../drill/exec/planner/physical/ScanPrel.java   |  7 +++++
 3 files changed, 30 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 4d495da..768e29d 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -60,12 +60,13 @@ public class HiveDrillNativeParquetScan extends HiveScan {
   public ScanStats getScanStats() {
     final ScanStats nativeHiveScanStats = super.getScanStats();
 
-    // As Drill's native parquet record reader is faster and memory efficient 
divide the costs by a factor.
+    // As Drill's native parquet record reader is faster and memory efficient. 
Divide the CPU cost
+    // by a factor to let the planner choose HiveDrillNativeScan over HiveScan 
with SerDes.
     return new ScanStats(
         nativeHiveScanStats.getGroupScanProperty(),
-        nativeHiveScanStats.getRecordCount()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
-        nativeHiveScanStats.getCpuCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR,
-        nativeHiveScanStats.getDiskCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR);
+        nativeHiveScanStats.getRecordCount(),
+        nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(),
+        nativeHiveScanStats.getDiskCost());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index de800a7..d52cc73 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -39,6 +39,7 @@ import 
org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -69,7 +70,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 public class HiveScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveScan.class);
 
-  protected static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR = 100;
+  private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20;
 
   @JsonProperty("hive-table")
   public HiveReadEntry hiveReadEntry;
@@ -319,21 +320,29 @@ public class HiveScan extends AbstractGroupScan {
         estRowCount = data/1024;
       }
 
-      // Hive's native reader is neither memory efficient nor fast. If the 
rowcount is below
-      // HIVE_SERDE_SCAN_OVERHEAD_FACTOR, make sure it is at least 
HIVE_SERDE_SCAN_OVERHEAD_FACTOR to enable the planner
-      // to choose HiveDrillNativeParquetScan. Due to the project on top of 
HiveDrillNativeParquetScan, we end up
-      // choosing the HiveScan instead of HiveDrillNativeParquetScan if the 
cost is too low.
-      if (estRowCount <= HIVE_SERDE_SCAN_OVERHEAD_FACTOR) {
-        estRowCount = HIVE_SERDE_SCAN_OVERHEAD_FACTOR;
-      }
+      // Hive's native reader is neither memory efficient nor fast. Increase 
the CPU cost
+      // by a factor to let the planner choose HiveDrillNativeScan over 
HiveScan with SerDes.
+      float cpuCost = 1 * getSerDeOverheadFactor();
 
       logger.debug("estimated row count = {}, stats row count = {}", 
estRowCount, rowCount);
-      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 
1, data);
+      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 
cpuCost, data);
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
   }
 
+  protected int getSerDeOverheadFactor() {
+    final int projectedColumnCount;
+    if (AbstractRecordReader.isStarQuery(columns)) {
+      Table hiveTable = hiveReadEntry.getTable();
+      projectedColumnCount = hiveTable.getSd().getColsSize() + 
hiveTable.getPartitionKeysSize();
+    } else {
+      projectedColumnCount = columns.size();
+    }
+
+    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
+  }
+
   @Override
   public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> 
children) throws ExecutionSetupException {
     return new HiveScan(this);

http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index d48e7cc..f69ee4c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -118,7 +118,14 @@ public class ScanPrel extends AbstractRelNode implements 
DrillScanPrel {
     // double rowCount = RelMetadataQuery.getRowCount(this);
     double rowCount = stats.getRecordCount();
 
+
     double cpuCost = rowCount * columnCount; // for now, assume cpu cost is 
proportional to row count.
+
+    // If a positive value for CPU cost is given multiply the default CPU cost 
by given CPU cost.
+    if (stats.getCpuCost() > 0) {
+      cpuCost *= stats.getCpuCost();
+    }
+
     // Even though scan is reading from disk, in the currently generated plans 
all plans will
     // need to read the same amount of data, so keeping the disk io cost 0 is 
ok for now.
     // In the future we might consider alternative scans that go against 
projections or

Reply via email to