Repository: drill Updated Branches: refs/heads/master 401119f3b -> 30ba97c17
DRILL-3884: Fix lower parallelization issues with Hive's native scan. This closes #185 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/30ba97c1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/30ba97c1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/30ba97c1 Branch: refs/heads/master Commit: 30ba97c17b533b8ed980f80deadbf9a52a87a659 Parents: 401119f Author: vkorukanti <venki.koruka...@gmail.com> Authored: Thu Oct 1 15:15:12 2015 -0700 Committer: vkorukanti <venki.koruka...@gmail.com> Committed: Fri Oct 2 00:09:28 2015 -0700 ---------------------------------------------------------------------- .../store/hive/HiveDrillNativeParquetScan.java | 9 ++++--- .../apache/drill/exec/store/hive/HiveScan.java | 27 +++++++++++++------- .../drill/exec/planner/physical/ScanPrel.java | 7 +++++ 3 files changed, 30 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index 4d495da..768e29d 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -60,12 +60,13 @@ public class HiveDrillNativeParquetScan extends HiveScan { public ScanStats getScanStats() { final ScanStats nativeHiveScanStats = super.getScanStats(); - // As Drill's native parquet record reader is faster and memory efficient divide the costs by a factor. + // As Drill's native parquet record reader is faster and memory efficient. Divide the CPU cost + // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes. return new ScanStats( nativeHiveScanStats.getGroupScanProperty(), - nativeHiveScanStats.getRecordCount()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR, - nativeHiveScanStats.getCpuCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR, - nativeHiveScanStats.getDiskCost()/HIVE_SERDE_SCAN_OVERHEAD_FACTOR); + nativeHiveScanStats.getRecordCount(), + nativeHiveScanStats.getCpuCost()/getSerDeOverheadFactor(), + nativeHiveScanStats.getDiskCost()); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index de800a7..d52cc73 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.hive.HiveTable.HivePartition; import org.apache.drill.exec.util.ImpersonationUtil; @@ -69,7 +70,7 @@ import org.apache.hadoop.security.UserGroupInformation; public class HiveScan extends AbstractGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class); - protected static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR = 100; + private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20; @JsonProperty("hive-table") public HiveReadEntry hiveReadEntry; @@ -319,21 +320,29 @@ public class HiveScan extends AbstractGroupScan { estRowCount = data/1024; } - // Hive's native reader is neither memory efficient nor fast. If the rowcount is below - // HIVE_SERDE_SCAN_OVERHEAD_FACTOR, make sure it is at least HIVE_SERDE_SCAN_OVERHEAD_FACTOR to enable the planner - // to choose HiveDrillNativeParquetScan. Due to the project on top of HiveDrillNativeParquetScan, we end up - // choosing the HiveScan instead of HiveDrillNativeParquetScan if the cost is too low. - if (estRowCount <= HIVE_SERDE_SCAN_OVERHEAD_FACTOR) { - estRowCount = HIVE_SERDE_SCAN_OVERHEAD_FACTOR; - } + // Hive's native reader is neither memory efficient nor fast. Increase the CPU cost + // by a factor to let the planner choose HiveDrillNativeScan over HiveScan with SerDes. + float cpuCost = 1 * getSerDeOverheadFactor(); logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount); - return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuCost, data); } catch (final IOException e) { throw new DrillRuntimeException(e); } } + protected int getSerDeOverheadFactor() { + final int projectedColumnCount; + if (AbstractRecordReader.isStarQuery(columns)) { + Table hiveTable = hiveReadEntry.getTable(); + projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize(); + } else { + projectedColumnCount = columns.size(); + } + + return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN; + } + @Override public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException { return new HiveScan(this); http://git-wip-us.apache.org/repos/asf/drill/blob/30ba97c1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index d48e7cc..f69ee4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -118,7 +118,14 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { // double rowCount = RelMetadataQuery.getRowCount(this); double rowCount = stats.getRecordCount(); + double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. + + // If a positive value for CPU cost is given multiply the default CPU cost by given CPU cost. + if (stats.getCpuCost() > 0) { + cpuCost *= stats.getCpuCost(); + } + // Even though scan is reading from disk, in the currently generated plans all plans will // need to read the same amount of data, so keeping the disk io cost 0 is ok for now. // In the future we might consider alternative scans that go against projections or