DRILL-626: Project push down into HBase scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/612527bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/612527bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/612527bd Branch: refs/heads/master Commit: 612527bd22c27aa92363d2297a9c2b4a05475fd0 Parents: 42763b6 Author: Aditya Kishore <adi...@maprtech.com> Authored: Sat May 3 16:39:21 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Fri May 9 16:49:19 2014 -0700 ---------------------------------------------------------------------- .../exceptions/DrillRuntimeException.java | 11 +- .../drill/exec/store/hbase/HBaseGroupScan.java | 90 +++++--- .../exec/store/hbase/HBaseRecordReader.java | 221 ++++++++++--------- .../exec/store/hbase/HBaseSchemaFactory.java | 6 +- .../exec/store/hbase/HBaseStoragePlugin.java | 6 +- .../drill/exec/store/hbase/HBaseSubScan.java | 10 + .../org/apache/drill/hbase/BaseHBaseTest.java | 72 ++++++ .../drill/hbase/TestHBaseFilterPushDown.java | 52 +---- .../drill/hbase/TestHBaseProjectPushDown.java | 35 +++ ...base_scan_screen_physical_column_select.json | 2 +- .../exec/physical/base/AbstractGroupScan.java | 13 +- .../drill/exec/physical/base/GroupScan.java | 8 +- .../planner/logical/DrillPushProjIntoScan.java | 118 ++-------- .../drill/exec/planner/physical/PrelUtil.java | 85 ++++++- .../exec/store/dfs/easy/EasyGroupScan.java | 10 +- .../apache/drill/exec/store/hive/HiveScan.java | 4 +- .../exec/store/ischema/InfoSchemaGroupScan.java | 4 +- .../exec/store/parquet/ParquetGroupScan.java | 11 +- 18 files changed, 415 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java index 9266cdd..abc7065 100644 --- a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java +++ b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java @@ -19,6 +19,7 @@ package org.apache.drill.common.exceptions; public class DrillRuntimeException extends RuntimeException { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuntimeException.class); + private static final long serialVersionUID = -3796081521525479249L; public DrillRuntimeException() { super(); @@ -39,6 +40,12 @@ public class DrillRuntimeException extends RuntimeException { public DrillRuntimeException(Throwable cause) { super(cause); } - - + + public static void format(String format, Object...args) { + format(null, format, args); + } + + public static void format(Throwable cause, String format, Object...args) { + throw new DrillRuntimeException(String.format(format, args), cause); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index bcdebc3..de60741 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -38,8 +38,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; @@ -61,44 +61,25 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class); private HBaseStoragePluginConfig storagePluginConfig; - @JsonProperty("storage") - public HBaseStoragePluginConfig getStorageConfig() { - return this.storagePluginConfig; - } private List<SchemaPath> columns; - @JsonProperty - public List<SchemaPath> getColumns() { - return columns; - } private HBaseScanSpec hbaseScanSpec; - @JsonProperty - public HBaseScanSpec getHBaseScanSpec() { - return hbaseScanSpec; - } - @JsonIgnore - public HBaseStoragePlugin getStoragePlugin() { - return storagePlugin; - } + private HBaseStoragePlugin storagePlugin; private Stopwatch watch = new Stopwatch(); private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings; - private HBaseStoragePlugin storagePlugin; private List<EndpointAffinity> endpointAffinities; private NavigableMap<HRegionInfo,ServerName> regionsToScan; + private HTableDescriptor hTableDesc; @JsonCreator public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec, @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig, @JsonProperty("columns") List<SchemaPath> columns, @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { - this.storagePlugin = (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig); - this.storagePluginConfig = storagePluginConfig; - this.hbaseScanSpec = hbaseScanSpec; - this.columns = columns; - getRegionInfos(); + this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns); } public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec scanSpec, List<SchemaPath> columns) { @@ -106,9 +87,13 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this.storagePluginConfig = storageEngine.getConfig(); this.hbaseScanSpec = scanSpec; this.columns = columns; - getRegionInfos(); + init(); } + /** + * Private constructor, used for cloning. + * @param that The + */ private HBaseGroupScan(HBaseGroupScan that) { this.columns = that.columns; this.endpointAffinities = that.endpointAffinities; @@ -117,12 +102,22 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this.regionsToScan = that.regionsToScan; this.storagePlugin = that.storagePlugin; this.storagePluginConfig = that.storagePluginConfig; + this.hTableDesc = that.hTableDesc; + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + HBaseGroupScan newScan = new HBaseGroupScan(this); + newScan.columns = columns; + newScan.verifyColumns(); + return newScan; } - private void getRegionInfos() { + private void init() { logger.debug("Getting region locations"); try { HTable table = new HTable(storagePluginConfig.getHBaseConf(), hbaseScanSpec.getTableName()); + this.hTableDesc = table.getTableDescriptor(); NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); table.close(); @@ -142,6 +137,18 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } + verifyColumns(); + } + + private void verifyColumns() { + if (columns != null) { + for (SchemaPath column : columns) { + if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); + } + } + } } @Override @@ -232,8 +239,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst @Override public Size getSize() { // TODO - this is wrong, need to populate correctly - int size = (hbaseScanSpec.getFilter() != null ? 5 : 10) * regionsToScan.size(); - return new Size(size, size); + int rowCount = (hbaseScanSpec.getFilter() != null ? 5 : 10) * regionsToScan.size(); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new Size(rowCount, numColumns*avgColumnSize); } @Override @@ -244,6 +253,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst return this; } + @JsonIgnore + public HBaseStoragePlugin getStoragePlugin() { + return storagePlugin; + } + @Override public String getDigest() { return toString(); @@ -256,16 +270,24 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst + columns + "]"; } - @Override - public GroupScan clone(List<SchemaPath> columns) { - HBaseGroupScan newScan = new HBaseGroupScan(this); - newScan.columns = columns; - return newScan; + @JsonProperty("storage") + public HBaseStoragePluginConfig getStorageConfig() { + return this.storagePluginConfig; } - @Override - public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { + @JsonProperty + public List<SchemaPath> getColumns() { return columns; } + @JsonProperty + public HBaseScanSpec getHBaseScanSpec() { + return hbaseScanSpec; + } + + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index aa5743f..af059f5 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -19,8 +19,11 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -31,7 +34,6 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -40,22 +42,20 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); @@ -64,59 +64,89 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private List<SchemaPath> columns; private OutputMutator outputMutator; - private Scan scan; + private ResultScanner resultScanner; - private FragmentContext context; Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap; private Result leftOver; private VarBinaryVector rowKeyVector; private SchemaPath rowKeySchemaPath; private HTable table; - public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec e, List<SchemaPath> columns, FragmentContext context) throws OutOfMemoryException { - this.columns = columns; - this.scan = new Scan(e.getStartRow(), e.getStopRow()); - this.scan.setFilter(e.getScanFilter()); - this.context = context; - if (columns != null && columns.size() != 0) { - for (SchemaPath column : columns) { + public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, + List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException { + Scan scan= new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); + boolean rowKeyOnly = true; + if (projectedColumns != null && projectedColumns.size() != 0) { + /* + * This will change once the non-scaler value vectors are available. + * Then, each column family will have a single top level value vector + * and each column will be an item vector in its corresponding TLV. + */ + this.columns = Lists.newArrayList(projectedColumns); + Iterator<SchemaPath> columnIterator = columns.iterator(); + while(columnIterator.hasNext()) { + SchemaPath column = columnIterator.next(); if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) { rowKeySchemaPath = ROW_KEY_PATH; continue; } + rowKeyOnly = false; NameSegment root = column.getRootSegment(); - assert root != null; - PathSegment child = root.getChild(); byte[] family = root.getPath().toString().getBytes(); - if (child != null) { - Preconditions.checkArgument(child.getChild() == null, "Unsupported column name: " + column.toString()); + PathSegment child = root.getChild(); + if (child != null && child.isNamed()) { byte[] qualifier = child.getNameSegment().getPath().toString().getBytes(); scan.addColumn(family, qualifier); } else { + columnIterator.remove(); scan.addFamily(family); } - } } else { - if (this.columns == null) { - this.columns = Lists.newArrayList(); - } + this.columns = Lists.newArrayList(); + rowKeyOnly = false; rowKeySchemaPath = ROW_KEY_PATH; this.columns.add(rowKeySchemaPath); } - Configuration config = HBaseConfiguration.create(conf); try { + if (rowKeySchemaPath != null) { + /* if ROW_KEY was requested, we can not qualify the scan with columns, + * otherwise HBase will omit the entire row of all of the specified columns do + * not exist for that row. Eventually we may want to use Family and/or Qualifier + * Filters in such case but that would mean additional processing at server. + */ + scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR)); + } + + Filter scanFilter = subScanSpec.getScanFilter(); + if (rowKeyOnly) { + /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan + * to fetch only one KV from each row. If a filter is already part of this + * scan, add the FirstKeyOnlyFilter as the SECOND filter of a MUST_PASS_ALL + * FilterList. + */ + Filter firstKeyFilter = new FirstKeyOnlyFilter(); + scanFilter = (scanFilter == null) + ? firstKeyFilter + : new FilterList(Operator.MUST_PASS_ALL, scanFilter, firstKeyFilter); + } + scan.setFilter(scanFilter); scan.setCaching(TARGET_RECORD_COUNT); - table = new HTable(config, e.getTableName()); + + table = new HTable(conf, subScanSpec.getTableName()); resultScanner = table.getScanner(scan); + try { + table.close(); + } catch (IOException e) { + logger.warn("Failure while closing HBase table", e); + } } catch (IOException e1) { throw new DrillRuntimeException(e1); } } @Override - @SuppressWarnings("deprecation") public void setup(OutputMutator output) throws ExecutionSetupException { this.outputMutator = output; output.removeAllFields(); @@ -127,13 +157,9 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { try { if (column.equals(rowKeySchemaPath)) { MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY)); - rowKeyVector = output.addField(field, VarBinaryVector.class); - } else if (column.getRootSegment().getChild() != null){ - MaterializedField field = MaterializedField.create(column, Types.optional(TypeProtos.MinorType.VARBINARY)); - NullableVarBinaryVector v = output.addField(field, NullableVarBinaryVector.class); - String fullyQualified = column.getRootSegment().getPath() + "." + column.getRootSegment().getChild().getNameSegment().getPath(); - vvMap.put(new FamilyQualifierWrapper(fullyQualified), v); + } else if (column.getRootSegment().getChild() != null) { + getOrCreateColumnVector(new FamilyQualifierWrapper(column), false); } } catch (SchemaChangeException e) { throw new ExecutionSetupException(e); @@ -159,6 +185,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { v.clear(); v.allocateNew(); } + for (int count = 0; count < TARGET_RECORD_COUNT; count++) { Result result = null; try { @@ -176,6 +203,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count); return count; } + + // parse the result and populate the value vectors KeyValue[] kvs = result.raw(); byte[] bytes = result.getBytes().get(); if (rowKeyVector != null) { @@ -191,13 +220,10 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { int familyLength = kv.getFamilyLength(); int qualifierOffset = kv.getQualifierOffset(); int qualifierLength = kv.getQualifierLength(); - FamilyQualifierWrapper column = new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset, qualifierLength); - NullableVarBinaryVector v = vvMap.get(column); - if(v == null) { - v = addNewVector(column.toString()); - } int valueOffset = kv.getValueOffset(); int valueLength = kv.getValueLength(); + NullableVarBinaryVector v = getOrCreateColumnVector( + new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset, qualifierLength), true); if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) { setOutputValueCount(count); leftOver = result; @@ -211,13 +237,18 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { return TARGET_RECORD_COUNT; } - private NullableVarBinaryVector addNewVector(String column) { + private NullableVarBinaryVector getOrCreateColumnVector(FamilyQualifierWrapper column, boolean allocateOnCreate) { try { - MaterializedField field = MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), Types.optional(TypeProtos.MinorType.VARBINARY)); - NullableVarBinaryVector v = outputMutator.addField(field, NullableVarBinaryVector.class); - v.allocateNew(); - vvMap.put(new FamilyQualifierWrapper(column), v); - outputMutator.setNewSchema(); + NullableVarBinaryVector v = vvMap.get(column); + if(v == null) { + MaterializedField field = MaterializedField.create(column.asSchemaPath(), Types.optional(TypeProtos.MinorType.VARBINARY)); + v = outputMutator.addField(field, NullableVarBinaryVector.class); + if (allocateOnCreate) { + v.allocateNew(); + } + vvMap.put(column, v); + outputMutator.setNewSchema(); + } return v; } catch (SchemaChangeException e) { throw new DrillRuntimeException(e); @@ -226,11 +257,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { @Override public void cleanup() { - resultScanner.close(); - try { - table.close(); - } catch (IOException e) { - logger.warn("Failure while closing table", e); + if (resultScanner != null) { + resultScanner.close(); } } @@ -243,47 +271,24 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } - private static int compareArrays(byte[] left, int lstart, int llength, byte[] right, int rstart, int rlength) { - int length = Math.min(llength, rlength); - for (int i = 0; i < length; i++) { - if (left[lstart + i] != right[rstart + i]) { - return left[lstart + i] - right[rstart + 1]; - } - } - return llength - rlength; - } - private static class FamilyQualifierWrapper implements Comparable<FamilyQualifierWrapper> { - static final HashFunction hashFunction = Hashing.murmur3_32(); - - protected byte[] bytes; - protected int familyOffset, familyLength, qualifierOffset, qualifierLength; - String string; int hashCode; + protected String stringVal; + protected String family; + protected String qualifier; + + public FamilyQualifierWrapper(SchemaPath column) { + this(column.getRootSegment().getPath(), column.getRootSegment().getChild().getNameSegment().getPath()); + } public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int familyLength, int qualifierOffset, int qualifierLength) { - this.bytes = bytes; - this.familyOffset = familyOffset; - this.familyLength = familyLength; - this.qualifierOffset = qualifierOffset; - this.qualifierLength = qualifierLength; - Hasher hasher = hashFunction.newHasher(); - hasher.putBytes(bytes, familyOffset, familyLength); - hasher.putBytes(bytes, qualifierOffset, qualifierLength); - hashCode = hasher.hash().asInt(); + this(new String(bytes, familyOffset, familyLength), new String(bytes, qualifierOffset, qualifierLength)); } - public FamilyQualifierWrapper(String string) { - String[] strings = string.split("\\."); - this.string = string; - Hasher hasher = hashFunction.newHasher(); - byte[] fBytes = strings[0].getBytes(); - byte[] qBytes = strings[1].getBytes(); - hasher.putBytes(fBytes); - hasher.putBytes(qBytes); - familyLength = fBytes.length; - qualifierLength = qBytes.length; - hashCode = hasher.hash().asInt(); + public FamilyQualifierWrapper(String family, String qualifier) { + this.family = family; + this.qualifier = qualifier; + hashCode = 31*family.hashCode() + qualifier.hashCode(); } @Override @@ -292,46 +297,42 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } @Override - public boolean equals(Object other) { - return compareTo((FamilyQualifierWrapper) other) == 0; + public boolean equals(Object anObject) { + if (this == anObject) { + return true; + } + if (anObject instanceof FamilyQualifierWrapper) { + FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject; + // we compare qualifier first since many columns will have same family + if (!qualifier.equals(that.qualifier)) { + return false; + } + return family.equals(that.family); + } + return false; } @Override public String toString() { - if (string == null) { - buildString(); + if (stringVal == null) { + stringVal = new StringBuilder().append(new String(family)).append(".").append(new String(qualifier)).toString(); } - return string; - } - - public void buildString() { - StringBuilder builder = new StringBuilder(); - builder.append(new String(bytes, familyOffset, familyLength)); - builder.append("."); - builder.append(new String(bytes, qualifierOffset, qualifierLength)); - string = builder.toString(); + return stringVal; } - public void buildBytes() { - assert string != null; - bytes = string.getBytes(); - familyOffset = 0; - qualifierOffset = familyLength + 1; + public SchemaPath asSchemaPath() { + return SchemaPath.getCompoundPath(family, qualifier); } @Override public int compareTo(FamilyQualifierWrapper o) { - if (bytes == null) { - buildBytes(); - } - if (o.bytes == null) { - o.buildBytes(); - } - int val = Bytes.compareTo(bytes, familyOffset, familyLength, o.bytes, o.familyOffset, o.familyLength); + int val = family.compareTo(o.family); if (val != 0) { return val; } - return Bytes.compareTo(bytes, qualifierOffset, qualifierLength, o.bytes, o.qualifierOffset, o.qualifierLength); + return qualifier.compareTo(o.qualifier); } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index c4ac08c..25a5f80 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -19,15 +19,12 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Set; -import com.google.common.collect.ImmutableList; import net.hydromatic.optiq.Schema; import net.hydromatic.optiq.SchemaPlus; - import net.hydromatic.optiq.Table; -import org.apache.drill.exec.planner.logical.DrillTable; + import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; @@ -35,6 +32,7 @@ import org.apache.drill.exec.store.SchemaFactory; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; public class HBaseSchemaFactory implements SchemaFactory { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index ea1550d..7bc7c4b 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -19,14 +19,10 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Set; -import java.util.List; import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; -import org.apache.drill.exec.rpc.user.DrillUser; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; @@ -42,6 +38,8 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin { private final DrillbitContext context; private final HBaseStoragePluginConfig engineConfig; private final HBaseSchemaFactory schemaFactory; + + @SuppressWarnings("unused") private final String name; public HBaseStoragePlugin(HBaseStoragePluginConfig configuration, DrillbitContext context, String name) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index ceaf23f..6b87817 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.util.Bytes; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; @@ -197,6 +198,15 @@ public class HBaseSubScan extends AbstractBase implements SubScan { return this; } + @Override + public String toString() { + return "HBaseScanSpec [tableName=" + tableName + + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) + + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) + + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString()) + + "]"; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java new file mode 100644 index 0000000..3037321 --- /dev/null +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.hbase; + +import java.util.List; + +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.util.VectorUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +public class BaseHBaseTest extends BaseTestQuery { + protected static final String TEST_TABLE_1 = "TestTable1"; + + protected static HBaseAdmin admin; + protected static Configuration conf = HBaseConfiguration.create(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.set("hbase.zookeeper.property.clientPort", "2181"); + admin = new HBaseAdmin(conf); + TestTableGenerator.generateHBaseTable(admin, TEST_TABLE_1, 2, 1000); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + admin.disableTable(TEST_TABLE_1); + admin.deleteTable(TEST_TABLE_1); + } + + protected void verify(String sql, int expectedRowCount) throws Exception{ + sql = sql.replace("[TABLE_NAME]", TEST_TABLE_1); + List<QueryResultBatch> results = testSqlWithResults(sql); + + int rowCount = 0; + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + for(QueryResultBatch result : results){ + rowCount += result.getHeader().getRowCount(); + loader.load(result.getHeader().getDef(), result.getData()); + if (loader.getRecordCount() <= 0) { + break; + } + VectorUtil.showVectorAccessibleContent(loader, 8); + loader.clear(); + result.release(); + } + System.out.println("Total record count: " + rowCount); + Assert.assertEquals(expectedRowCount, rowCount); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 1911078..2d72192 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -17,41 +17,11 @@ */ package org.apache.drill.hbase; -import java.util.List; - -import org.apache.drill.BaseTestQuery; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.rpc.user.QueryResultBatch; -import org.apache.drill.exec.util.VectorUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @Ignore // Need to find a way to pass zookeeper port to HBase storage plugin configuration before enabling this test -public class TestHBaseFilterPushDown extends BaseTestQuery { - private static final String TABLE_NAME = "TestTable1"; - - private static HBaseAdmin admin; - private static Configuration conf = HBaseConfiguration.create(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf.set("hbase.zookeeper.property.clientPort", "2181"); - admin = new HBaseAdmin(conf); - TestTableGenerator.generateHBaseTable(admin, TABLE_NAME, 2, 1000); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - System.out.println("HBaseStorageHandlerTest: tearDownAfterClass()"); - admin.disableTable(TABLE_NAME); - admin.deleteTable(TABLE_NAME); - } +public class TestHBaseFilterPushDown extends BaseHBaseTest { @Test public void testFilterPushDownRowKeyEqual() throws Exception{ @@ -83,24 +53,4 @@ public class TestHBaseFilterPushDown extends BaseTestQuery { , 4); } - protected void verify(String sql, int expectedRowCount) throws Exception{ - sql = sql.replace("[TABLE_NAME]", TABLE_NAME); - List<QueryResultBatch> results = testSqlWithResults(sql); - - int rowCount = 0; - RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - for(QueryResultBatch result : results){ - rowCount += result.getHeader().getRowCount(); - loader.load(result.getHeader().getDef(), result.getData()); - if (loader.getRecordCount() <= 0) { - break; - } - VectorUtil.showVectorAccessibleContent(loader, 8); - loader.clear(); - result.release(); - } - System.out.println("Total record count: " + rowCount); - Assert.assertEquals(expectedRowCount, rowCount); - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java new file mode 100644 index 0000000..0600696 --- /dev/null +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.hbase; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore // Need to find a way to pass zookeeper port to HBase storage plugin configuration before enabling this test +public class TestHBaseProjectPushDown extends BaseHBaseTest { + + @Test + public void testRowKeyPushDown() throws Exception{ + verify("SELECT\n" + + "row_key, substring(row_key, 2, 1)*12\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName" + , 6); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json index 13a2982..f44f568 100644 --- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json +++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json @@ -19,7 +19,7 @@ "zookeeperPort" : 2181 }, columns: [ - "`f2`.c1", "`f2`.c2", "row_key" + "`f2`.c1", "`f2`.c2" ] }, { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index cd78bc1..1627137 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.base; import java.util.Iterator; import java.util.List; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.expression.SchemaPath; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Iterators; public abstract class AbstractGroupScan extends AbstractBase implements GroupScan { @@ -61,14 +61,9 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca return 0; } - /** - * Check if groupscan can support projects-push-down into scan. - * The default implementation assumes groupscan could not support project pushdown, by returning null. - * If one particular group scan can support, it should override this method. - */ - @Override - public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { - return null; + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 492dbc1..806b9db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -47,17 +47,13 @@ public interface GroupScan extends Scan, HasAffinity{ public abstract String getDigest(); /** - * Returns a clone of Groupscan instance, except that the new GroupScan will use the provided list of columns . - * + * Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns . */ - @JsonIgnore public GroupScan clone(List<SchemaPath> columns); /** * GroupScan should check the list of columns, and see if it could support all the columns in the list. - * If it can not support any of them, return null. Null indicates that this groupscan will not support - * project pushdown for this list of columns. */ - public List<SchemaPath> checkProjPush(List<SchemaPath> columns); + public boolean canPushdownProjects(List<SchemaPath> columns); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 0eae1da..0dd9b9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -19,33 +19,17 @@ package org.apache.drill.exec.planner.logical; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.physical.PrelUtil; import org.eigenbase.rel.ProjectRel; -import org.eigenbase.rel.ProjectRelBase; -import org.eigenbase.rel.RelNode; -import org.eigenbase.rel.rules.PushProjector; import org.eigenbase.rel.rules.RemoveTrivialProjectRule; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; -import org.eigenbase.reltype.RelDataType; -import org.eigenbase.reltype.RelDataTypeFactory; -import org.eigenbase.reltype.RelDataTypeField; -import org.eigenbase.rex.RexInputRef; -import org.eigenbase.rex.RexNode; -import org.eigenbase.rex.RexShuttle; - -import com.google.common.base.Objects; -import com.google.hive12.common.collect.Lists; public class DrillPushProjIntoScan extends RelOptRule { public static final RelOptRule INSTANCE = new DrillPushProjIntoScan(); @@ -59,101 +43,35 @@ public class DrillPushProjIntoScan extends RelOptRule { final ProjectRel proj = (ProjectRel) call.rel(0); final EnumerableTableAccessRel scan = (EnumerableTableAccessRel) call.rel(1); - List<Integer> columnsIds = getRefColumnIds(proj); - - RelDataType newScanRowType = createStructType(scan.getCluster().getTypeFactory(), getProjectedFields(scan.getRowType(),columnsIds)); - - DrillTable drillTable = scan.getTable().unwrap(DrillTable.class); try { - List<SchemaPath> columns = PrelUtil.getColumns(newScanRowType); - - GroupScan groupScan = drillTable.getGroupScan(); + List<SchemaPath> columns = PrelUtil.getColumns(scan.getRowType(), proj.getProjects()); - //Check if the group scan can support the list of columns. If not support, return without doing any further transformation. - List<SchemaPath> pushedColumns = groupScan.checkProjPush(columns); - - if (pushedColumns == null || pushedColumns.isEmpty()) + if (columns.isEmpty() || !scan.getTable().unwrap(DrillTable.class) + .getGroupScan().canPushdownProjects(columns)) { return; + } - final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scan.getTable(), newScanRowType, columns); - - List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, columnsIds); + final DrillScanRel newScan = + new DrillScanRel(scan.getCluster(), + scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scan.getTable(), + scan.getRowType(), + columns); - final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - newScan, convertedExprs, proj.getRowType()); + final DrillProjectRel newProj = + new DrillProjectRel(proj.getCluster(), + proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + newScan, + proj.getChildExps(), + proj.getRowType()); if (RemoveTrivialProjectRule.isTrivial(newProj)) { call.transformTo(newScan); } else { call.transformTo(newProj); } - } catch (IOException e) { - e.printStackTrace(); - return; - } - - } - - private List<RexNode> getConvertedProjExp(ProjectRel proj, RelNode child, List<Integer> columnsIds) { - PushProjector pushProjector = - new PushProjector( - proj, null, child, PushProjector.ExprCondition.FALSE); - ProjectRel topProject = pushProjector.convertProject(null); - - if (topProject !=null) - return topProject.getProjects(); - else - return proj.getProjects(); - } - - private RelDataType createStructType( - RelDataTypeFactory typeFactory, - final List<RelDataTypeField> fields - ) { - final RelDataTypeFactory.FieldInfoBuilder builder = - typeFactory.builder(); - for (RelDataTypeField field : fields) { - builder.add(field.getName(), field.getType()); - } - return builder.build(); - } - - - private List<Integer> getRefColumnIds(ProjectRelBase proj) { - RefFieldsVisitor v = new RefFieldsVisitor(); - - for (RexNode exp : proj.getProjects()) { - v.apply(exp); - } - return new ArrayList<Integer>(v.getReferencedFieldIndex()); - } - - private List<RelDataTypeField> getProjectedFields(RelDataType rowType, List<Integer> columnIds) { - List<RelDataTypeField> oldFields = rowType.getFieldList(); - List<RelDataTypeField> newFields = Lists.newArrayList(); - - for (Integer id : columnIds) { - newFields.add(oldFields.get(id)); - } - - return newFields; - } - - /** Visitor that finds the set of inputs that are used. */ - public static class RefFieldsVisitor extends RexShuttle { - public final SortedSet<Integer> inputPosReferenced = - new TreeSet<Integer>(); - - @Override - public RexNode visitInputRef(RexInputRef inputRef) { - inputPosReferenced.add(inputRef.getIndex()); - return inputRef; - } - - public Set<Integer> getReferencedFieldIndex() { - return this.inputPosReferenced; + throw new DrillRuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java index d69f8cf..e98b970 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java @@ -18,11 +18,15 @@ package org.apache.drill.exec.planner.physical; import java.util.List; +import java.util.Set; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.PathSegment.ArraySegment; +import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -33,9 +37,16 @@ import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelFieldCollation; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexInputRef; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.eigenbase.rex.RexVisitorImpl; import com.beust.jcommander.internal.Lists; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; public class PrelUtil { @@ -85,25 +96,75 @@ public class PrelUtil { return new SelectionVectorRemover(child); } - public static List<SchemaPath> getColumns(RelDataType rowType) { - final List<String> fields = rowType.getFieldNames(); + public static List<SchemaPath> getColumns(RelDataType rowType, List<RexNode> projects) { + final List<String> fieldNames = rowType.getFieldNames(); + if (fieldNames.isEmpty()) return ImmutableList.of(); - if (fields.isEmpty()) return null; + RefFieldsVisitor v = new RefFieldsVisitor(fieldNames); + for (RexNode exp : projects) { + PathSegment segment = exp.accept(v); + v.addColumn(segment); + } - List<SchemaPath> columns = Lists.newArrayList(); + List<SchemaPath> columns = v.getColumns(); + for (SchemaPath column : columns) { + if (column.getRootSegment().getPath().startsWith("*")) { + return ImmutableList.of(); + } + } - for (String field : fields) { - //If star column is required, no project pushdown. Just return null, to indicate SCAN should get ALL the columns. - if (field.startsWith("*")) - return null; + return columns; + } + + /** Visitor that finds the set of inputs that are used. */ + private static class RefFieldsVisitor extends RexVisitorImpl<PathSegment> { + final Set<SchemaPath> columns = Sets.newLinkedHashSet(); + final private List<String> fieldNames; + + public RefFieldsVisitor(List<String> fieldNames) { + super(true); + this.fieldNames = fieldNames; + } + + public void addColumn(PathSegment segment) { + if (segment != null && segment instanceof NameSegment) { + columns.add(new SchemaPath((NameSegment)segment)); + } + } - columns.add(SchemaPath.getSimplePath(field)); + public List<SchemaPath> getColumns() { + return ImmutableList.copyOf(columns); + } + @Override + public PathSegment visitInputRef(RexInputRef inputRef) { + return new NameSegment(fieldNames.get(inputRef.getIndex())); } - if (columns.isEmpty()) + @Override + public PathSegment visitCall(RexCall call) { + if ("ITEM".equals(call.getOperator().getName())) { + return call.operands.get(0).accept(this) + .cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1))); + } + // else + for (RexNode operand : call.operands) { + addColumn(operand.accept(this)); + } return null; - else - return columns; + } + + private PathSegment convertLiteral(RexLiteral literal) { + switch (literal.getType().getSqlTypeName()) { + case CHAR: + return new NameSegment(RexLiteral.stringValue(literal)); + case INTEGER: + return new ArraySegment(RexLiteral.intValue(literal)); + default: + return null; + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index f94cff8..03e2095 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -137,7 +137,9 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public Size getSize() { - return new Size(1024,1024); + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + int avgColumnSize = 10; + return new Size(1024, numColumns*avgColumnSize); } @JsonProperty("files") @@ -227,9 +229,9 @@ public class EasyGroupScan extends AbstractGroupScan{ return newScan; } - @Override - public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { - return columns; + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 2972928..ed5a6cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -261,7 +261,9 @@ public class HiveScan extends AbstractGroupScan { @Override public Size getSize() { // TODO - this is wrong, need to populate correctly - return new Size(10,10); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new Size(10, avgColumnSize*numColumns); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index 5202038..5014386 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -85,7 +85,9 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ @Override public Size getSize() { - return new Size(1000, 1000); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new Size(1000, numColumns*avgColumnSize); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 4d4ec9b..4019ff1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -342,7 +342,9 @@ public class ParquetGroupScan extends AbstractGroupScan { @Override public Size getSize() { // TODO - this is wrong, need to populate correctly - return new Size(10, 10); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new Size(10, numColumns*avgColumnSize); } @Override @@ -372,8 +374,9 @@ public class ParquetGroupScan extends AbstractGroupScan { return newScan; } - @Override - public List<SchemaPath> checkProjPush(List<SchemaPath> columns) { - return columns; + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; } + }