DRILL-825: MaterializedField is mutable and is not suitable as a KEY in a MAP
+ Minor optimization/cleanup in HBaseRecordReader Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cd7aeebe Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cd7aeebe Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cd7aeebe Branch: refs/heads/master Commit: cd7aeebe65ccb4e8829282aa6e6cae6ed867796b Parents: 84390e8 Author: Aditya Kishore <[email protected]> Authored: Thu May 22 18:37:59 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 22 19:22:21 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hbase/DrillHBaseConstants.java | 9 ++ .../exec/store/hbase/HBaseRecordReader.java | 17 ++- .../drill/exec/physical/impl/ScanBatch.java | 36 +++--- .../drill/exec/record/MaterializedField.java | 110 +++++++++++++------ 4 files changed, 113 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java index a86797b..5ba9979 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.store.hbase; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; public interface DrillHBaseConstants { static final String ROW_KEY = "row_key"; @@ -26,4 +29,10 @@ public interface DrillHBaseConstants { static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort"; + static final MajorType ROW_KEY_TYPE = Types.required(MinorType.VARBINARY); + + static final MajorType COLUMN_FAMILY_TYPE = Types.required(MinorType.MAP); + + static final MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY); + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 439f97f..caee8ed 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -58,13 +58,12 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); private static final int TARGET_RECORD_COUNT = 4000; - + private LinkedHashSet<SchemaPath> columns; private OutputMutator outputMutator; private Map<String, MapVector> familyVectorMap; private VarBinaryVector rowKeyVector; - private SchemaPath rowKeySchemaPath; private HTable hTable; private ResultScanner resultScanner; @@ -86,8 +85,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { while(columnIterator.hasNext()) { SchemaPath column = columnIterator.next(); if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) { - rowKeySchemaPath = ROW_KEY_PATH; - this.columns.add(rowKeySchemaPath); + this.columns.add(ROW_KEY_PATH); continue; } rowKeyOnly = false; @@ -104,8 +102,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } else { rowKeyOnly = false; - rowKeySchemaPath = ROW_KEY_PATH; - this.columns.add(rowKeySchemaPath); + this.columns.add(ROW_KEY_PATH); } hbaseScan.setFilter(subScanSpec.getScanFilter()); @@ -130,8 +127,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { try { // Add Vectors to output in the order specified when creating reader for (SchemaPath column : columns) { - if (column.equals(rowKeySchemaPath)) { - MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY)); + if (column.equals(ROW_KEY_PATH)) { + MaterializedField field = MaterializedField.create(column, ROW_KEY_TYPE); rowKeyVector = outputMutator.addField(field, VarBinaryVector.class); } else { getOrCreateFamilyVector(column.getRootSegment().getPath(), false); @@ -216,7 +213,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { MapVector v = familyVectorMap.get(familyName); if(v == null) { SchemaPath column = SchemaPath.getSimplePath(familyName); - MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.MAP)); + MaterializedField field = MaterializedField.create(column, COLUMN_FAMILY_TYPE); v = outputMutator.addField(field, MapVector.class); if (allocateOnCreate) { v.allocateNew(); @@ -232,7 +229,7 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qualifier) { int oldSize = mv.size(); - NullableVarBinaryVector v = mv.addOrGet(qualifier, Types.optional(TypeProtos.MinorType.VARBINARY), NullableVarBinaryVector.class); + NullableVarBinaryVector v = mv.addOrGet(qualifier, COLUMN_TYPE, NullableVarBinaryVector.class); if (oldSize != mv.size()) { v.allocateNew(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index c0810c6..7febb10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -22,48 +22,49 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import com.google.common.collect.Lists; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.util.BatchPrinter; -import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarCharVector; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.drill.exec.vector.VarCharVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; /** * Record batch used for a particular scan. Operators against one or more */ public class ScanBatch implements RecordBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; - final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap(); - final Map<MaterializedField, Class<?>> fieldVectorClassMap = Maps.newHashMap(); private static final int MAX_RECORD_CNT = Character.MAX_VALUE; + private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap(); + private final VectorContainer container = new VectorContainer(); private int recordCount; private final FragmentContext context; @@ -74,8 +75,8 @@ public class ScanBatch implements RecordBatch { private final Mutator mutator = new Mutator(); private Iterator<String[]> partitionColumns; private String[] partitionValues; - List<ValueVector> partitionVectors; - List<Integer> selectedPartitionColumns; + private List<ValueVector> partitionVectors; + private List<Integer> selectedPartitionColumns; private String partitionColumnDesignator; public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { @@ -95,7 +96,7 @@ public class ScanBatch implements RecordBatch { } public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException { - this(subScanConfig, context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST); + this(subScanConfig, context, readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList()); } @Override @@ -224,14 +225,14 @@ public class ScanBatch implements RecordBatch { @Override public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { // Check if the field exists - ValueVector v = fieldVectorMap.get(field); + ValueVector v = fieldVectorMap.get(field.key()); if (v == null || v.getClass() != clazz) { // Field does not exist add it to the map and the output container v = TypeHelper.getNewVector(field, oContext.getAllocator()); if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); container.add(v); - fieldVectorMap.put(field, v); + fieldVectorMap.put(field.key(), v); // Adding new vectors to the container mark that the schema has changed schemaChange = true; @@ -243,7 +244,7 @@ public class ScanBatch implements RecordBatch { @Override public void addFields(List<ValueVector> vvList) { for (ValueVector v : vvList) { - fieldVectorMap.put(v.getField(), v); + fieldVectorMap.put(v.getField().key(), v); container.add(v); } schemaChange = true; @@ -278,6 +279,7 @@ public class ScanBatch implements RecordBatch { public void cleanup(){ container.clear(); + fieldVectorMap.clear(); oContext.close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cd7aeebe/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java index 3d749d6..48073c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; import java.util.List; +import java.util.Map; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.PathSegment; @@ -30,15 +31,13 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField; import com.google.hive12.common.collect.Lists; -public class MaterializedField{ - private SchemaPath path; - private MajorType type; +public class MaterializedField { + private Key key; private List<MaterializedField> children = Lists.newArrayList(); private MaterializedField(SchemaPath path, MajorType type) { super(); - this.path = path; - this.type = type; + key = new Key(path, type); } public static MaterializedField create(SerializedField serField){ @@ -47,8 +46,8 @@ public class MaterializedField{ public SerializedField.Builder getAsBuilder(){ return SerializedField.newBuilder() // - .setMajorType(type) // - .setNamePart(path.getAsNamePart()); + .setMajorType(key.type) // + .setNamePart(key.path.getAsNamePart()); } public void addChild(MaterializedField field){ @@ -56,11 +55,11 @@ public class MaterializedField{ } public MaterializedField clone(FieldReference ref){ - return create(ref, type); + return create(ref, key.type); } public String getLastName(){ - PathSegment seg = path.getRootSegment(); + PathSegment seg = key.path.getRootSegment(); while(seg.getChild() != null) seg = seg.getChild(); return seg.getNameSegment().getPath(); } @@ -82,7 +81,7 @@ public class MaterializedField{ } public SchemaPath getPath(){ - return path; + return key.path; } /** @@ -91,7 +90,7 @@ public class MaterializedField{ */ @Deprecated public SchemaPath getAsSchemaPath(){ - return path; + return getPath(); } // public String getName(){ @@ -116,29 +115,29 @@ public class MaterializedField{ // } public int getWidth() { - return type.getWidth(); + return key.type.getWidth(); } public MajorType getType() { - return type; + return key.type; } public int getScale() { - return type.getScale(); + return key.type.getScale(); } public int getPrecision() { - return type.getPrecision(); + return key.type.getPrecision(); } public boolean isNullable() { - return type.getMode() == DataMode.OPTIONAL; + return key.type.getMode() == DataMode.OPTIONAL; } public DataMode getDataMode() { - return type.getMode(); + return key.type.getMode(); } public MaterializedField getOtherNullableVersion(){ - MajorType mt = type; + MajorType mt = key.type; DataMode newDataMode = null; switch(mt.getMode()){ case OPTIONAL: @@ -150,7 +149,7 @@ public class MaterializedField{ default: throw new UnsupportedOperationException(); } - return new MaterializedField(path, mt.toBuilder().setMode(newDataMode).build()); + return new MaterializedField(key.path, mt.toBuilder().setMode(newDataMode).build()); } public Class<?> getValueClass() { @@ -160,7 +159,7 @@ public class MaterializedField{ public boolean matches(SchemaPath path) { if(!path.isSimplePath()) return false; - return this.path.equals(path); + return key.path.equals(path); } @@ -169,8 +168,7 @@ public class MaterializedField{ final int prime = 31; int result = 1; result = prime * result + ((children == null) ? 0 : children.hashCode()); - result = prime * result + ((path == null) ? 0 : path.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); return result; } @@ -188,25 +186,73 @@ public class MaterializedField{ return false; } else if (!children.equals(other.children)) return false; - if (path == null) { - if (other.path != null) + if (key == null) { + if (other.key != null) return false; - } else if (!path.equals(other.path)) - return false; - if (type == null) { - if (other.type != null) - return false; - } else if (!type.equals(other.type)) + } else if (!key.equals(other.key)) return false; return true; } @Override public String toString() { - return "MaterializedField [path=" + path + ", type=" + Types.toString(type) + "]"; + return "MaterializedField [path=" + key.path + ", type=" + Types.toString(key.type) + "]"; + } + + public Key key() { + return key; } public String toExpr(){ - return path.toExpr(); + return key.path.toExpr(); + } + + /** + * Since the {@code MaterializedField) itself is mutable, in certain cases, it is not suitable + * as a key of a {@link Map}. This inner class allows the {@link MaterializedField} object to be + * used for this purpose. + */ + public class Key { + + private SchemaPath path; + private MajorType type; + + private Key(SchemaPath path, MajorType type) { + this.path = path; + this.type = type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Key other = (Key) obj; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (type == null) { + if (other.type != null) + return false; + } else if (!type.equals(other.type)) + return false; + return true; + } + } + } \ No newline at end of file
