Updated Branches: refs/heads/master bd41633f1 -> 47985bad0
Added JSONScanPOP and JSONScanBatch Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/73fad99a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/73fad99a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/73fad99a Branch: refs/heads/master Commit: 73fad99a752f3f37944e082353aaf790154953a1 Parents: bd41633 Author: Timothy Chen <[email protected]> Authored: Wed Jun 19 23:21:01 2013 -0700 Committer: Timothy Chen <[email protected]> Committed: Thu Aug 8 20:46:55 2013 -0700 ---------------------------------------------------------------------- .../templates/RepeatedValueVectors.java | 7 +- .../physical/config/JSONScanBatchCreator.java | 46 +++++ .../drill/exec/physical/config/JSONScanPOP.java | 114 ++++++++++++ .../drill/exec/physical/impl/ImplCreator.java | 14 +- .../apache/drill/exec/schema/DiffSchema.java | 20 +-- .../drill/exec/store/JSONRecordReader.java | 175 +++++++++++-------- .../apache/drill/exec/store/VectorHolder.java | 96 +++++----- .../drill/exec/vector/FixedWidthVector.java | 1 - .../drill/exec/vector/RepeatedMutator.java | 23 +++ .../physical/impl/TestSimpleFragmentRun.java | 13 +- .../resources/physical_json_scan_test1.json | 23 +++ 11 files changed, 388 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java index 1afe84b..c629a1d 100644 --- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java +++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java @@ -249,7 +249,7 @@ import com.google.common.collect.Lists; } } - public final class Mutator implements ValueVector.Mutator{ + public final class Mutator implements RepeatedMutator { private Mutator(){ @@ -262,10 +262,7 @@ import com.google.common.collect.Lists; * @param index record of the element to add * @param value value to add to the given row */ - public void add(int index, <#if (type.width > 4)> ${minor.javaType!type.javaType} - <#elseif type.major == "VarLen"> byte[] - <#else> int - </#if> value) { + public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) { int nextOffset = offsets.getAccessor().get(index+1); values.getMutator().set(nextOffset, value); offsets.getMutator().set(index+1, nextOffset+1); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java new file mode 100644 index 0000000..f93f03b --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.physical.config; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.JSONRecordReader; +import org.apache.drill.exec.store.RecordReader; + +import java.util.List; + +public class JSONScanBatchCreator implements BatchCreator<JSONScanPOP> { + + @Override + public RecordBatch getBatch(FragmentContext context, JSONScanPOP config, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List<JSONScanPOP.ScanEntry> entries = config.getReadEntries(); + List<RecordReader> readers = Lists.newArrayList(); + for (JSONScanPOP.ScanEntry e : entries) { + readers.add(new JSONRecordReader(context, e.getUrl())); + } + + return new ScanBatch(context, readers.iterator()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java new file mode 100644 index 0000000..1dcf5e1 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java @@ -0,0 +1,114 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.ReadEntry; +import org.apache.drill.exec.physical.base.AbstractScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Scan; +import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.proto.CoordinationProtos; + +import java.io.File; +import java.net.URI; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +@JsonTypeName("json-scan") +public class JSONScanPOP extends AbstractScan<JSONScanPOP.ScanEntry> { + private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb + + private LinkedList[] mappings; + + @JsonCreator + public JSONScanPOP(@JsonProperty("entries") List<JSONScanPOP.ScanEntry> readEntries) { + super(readEntries); + } + + @SuppressWarnings("unchecked") + @Override + public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) { + checkArgument(endpoints.size() <= getReadEntries().size()); + + mappings = new LinkedList[endpoints.size()]; + + int i = 0; + for (ScanEntry e : this.getReadEntries()) { + if (i == endpoints.size()) i = 0; + LinkedList entries = mappings[i]; + if (entries == null) { + entries = new LinkedList<>(); + mappings[i] = entries; + } + entries.add(e); + i++; + } + } + + @SuppressWarnings("unchecked") + @Override + public Scan<?> getSpecificScan(int minorFragmentId) { + checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId); + return new JSONScanPOP(mappings[minorFragmentId]); + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + return Collections.emptyList(); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + return new JSONScanPOP(readEntries); + } + + public static class ScanEntry implements ReadEntry { + private final String url; + private Size size; + + @JsonCreator + public ScanEntry(@JsonProperty("url") String url) { + this.url = url; + long fileLength = new File(URI.create(url)).length(); + size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE); + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(1, 1, 2, 2); + } + + @Override + public Size getSize() { + return size; + } + + public String getUrl() { + return url; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index c31e9e4..1c15289 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -26,15 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Scan; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.MockScanBatchCreator; -import org.apache.drill.exec.physical.config.MockScanPOP; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SelectionVectorRemover; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.*; import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator; import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator; import org.apache.drill.exec.physical.impl.sort.SortBatchCreator; @@ -74,7 +66,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo Preconditions.checkNotNull(context); if(scan instanceof MockScanPOP){ - return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch> emptyList()); + return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch>emptyList()); + } else if(scan instanceof JSONScanPOP) { + return new JSONScanBatchCreator().getBatch(context, (JSONScanPOP)scan, Collections.<RecordBatch>emptyList()); }else{ return super.visitScan(scan, context); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java index b654a92..68c3e12 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java @@ -26,6 +26,7 @@ import java.util.List; public class DiffSchema { List<Field> addedFields; List<Field> removedFields; + boolean hasChanged = false; public DiffSchema() { this.addedFields = Lists.newArrayList(); @@ -34,27 +35,22 @@ public class DiffSchema { public void recordNewField(Field field) { addedFields.add(field); - } - - public boolean hasDiffFields() { - return !addedFields.isEmpty() || !removedFields.isEmpty(); - } - - public List<Field> getAddedFields() { - return addedFields; - } - - public List<Field> getRemovedFields() { - return removedFields; + hasChanged = true; } public void reset() { addedFields.clear(); removedFields.clear(); + hasChanged = false; } public void addRemovedField(Field field) { removedFields.add(field); + hasChanged = true; + } + + public boolean isHasChanged() { + return hasChanged; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java index f72b519..ff7d315 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java @@ -1,16 +1,15 @@ package org.apache.drill.exec.store; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.END_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME; - -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; - +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.common.io.InputSupplier; +import com.google.common.io.Resources; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ExpressionPosition; @@ -22,27 +21,19 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.schema.DiffSchema; -import org.apache.drill.exec.schema.Field; -import org.apache.drill.exec.schema.ListSchema; -import org.apache.drill.exec.schema.NamedField; -import org.apache.drill.exec.schema.ObjectSchema; -import org.apache.drill.exec.schema.OrderedField; -import org.apache.drill.exec.schema.RecordSchema; -import org.apache.drill.exec.schema.SchemaIdGenerator; +import org.apache.drill.exec.schema.*; import org.apache.drill.exec.schema.json.jackson.JacksonHelper; import org.apache.drill.exec.vector.*; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.google.common.base.Charsets; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import com.google.common.io.InputSupplier; -import com.google.common.io.Resources; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +import static com.fasterxml.jackson.core.JsonToken.*; public class JSONRecordReader implements RecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class); @@ -89,7 +80,7 @@ public class JSONRecordReader implements RecordReader { if (inputPath.startsWith("resource:")) { input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8); } else { - input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8); + input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8); } JsonFactory factory = new JsonFactory(); @@ -112,7 +103,7 @@ public class JSONRecordReader implements RecordReader { int nextRowIndex = 0; try { - while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) { + while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++, 0)) { parser.nextToken(); // Read to START_OBJECT token if (!parser.hasCurrentToken()) { @@ -133,9 +124,19 @@ public class JSONRecordReader implements RecordReader { outputMutator.removeField(field.getAsMaterializedField()); } + if (diffSchema.isHasChanged()) { + outputMutator.setNewSchema(); + } + + } catch (IOException | SchemaChangeException e) { logger.error("Error reading next in Json reader", e); } + + for (VectorHolder holder : valueVectorMap.values()) { + holder.populateVectorLength(); + } + return nextRowIndex; } @@ -171,18 +172,10 @@ public class JSONRecordReader implements RecordReader { return removedFields; } - private DiffSchema getDiffSchema() { - return diffSchema; - } - public BufferAllocator getAllocator() { return allocator; } - public OutputMutator getOutputMutator() { - return outputMutator; - } - public static enum ReadType { ARRAY(END_ARRAY) { @Override @@ -221,10 +214,12 @@ public class JSONRecordReader implements RecordReader { return endObject; } + @SuppressWarnings("ConstantConditions") public boolean readRecord(Field parentField, JSONRecordReader reader, String prefixFieldName, - int rowIndex) throws IOException, SchemaChangeException { + int rowIndex, + int groupCount) throws IOException, SchemaChangeException { JsonParser parser = reader.getParser(); JsonToken token = parser.nextToken(); JsonToken endObject = getEndObject(); @@ -242,23 +237,26 @@ public class JSONRecordReader implements RecordReader { switch (token) { case START_ARRAY: readType = ReadType.ARRAY; + groupCount++; break; case START_OBJECT: readType = ReadType.OBJECT; + groupCount = 0; break; } - if (fieldType != null) { // Including nulls - boolean currentFieldNotFull = recordData( - parentField, - readType, - reader, - fieldType, - prefixFieldName, - fieldName, - rowIndex, colIndex); - - isFull = isFull || !currentFieldNotFull; + if (fieldType != null) { // Including nulls + isFull = isFull || + !recordData( + parentField, + readType, + reader, + fieldType, + prefixFieldName, + fieldName, + rowIndex, + colIndex, + groupCount); } token = parser.nextToken(); colIndex += 1; @@ -286,7 +284,8 @@ public class JSONRecordReader implements RecordReader { String prefixFieldName, String fieldName, int rowIndex, - int colIndex) throws IOException, SchemaChangeException { + int colIndex, + int groupCount) throws IOException, SchemaChangeException { RecordSchema currentSchema = reader.getCurrentSchema(); Field field = currentSchema.getField(fieldName, colIndex); boolean isFieldFound = field != null; @@ -323,10 +322,14 @@ public class JSONRecordReader implements RecordReader { field.assignSchemaIfNull(newSchema); if (fieldSchema == null) reader.setCurrentSchema(newSchema); - readType.readRecord(field, reader, field.getFullFieldName(), rowIndex); + if(readType == ReadType.ARRAY) { + readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount); + } else { + readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount); + } reader.setCurrentSchema(currentSchema); - } else if (holder != null) { + } else { return addValueToVector( rowIndex, holder, @@ -335,32 +338,54 @@ public class JSONRecordReader implements RecordReader { reader.getParser(), fieldType.getMinorType() ), - fieldType.getMinorType() + fieldType.getMinorType(), + groupCount ); } return true; } - private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) { + private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType, int groupCount) { switch (minorType) { case INT: { - holder.incAndCheckLength(32 + 1); - NullableIntVector int4 = (NullableIntVector) holder.getValueVector(); - NullableIntVector.Mutator m = int4.getMutator(); - if (val != null) { - m.set(index, (Integer) val); + holder.incAndCheckLength(32); + if (groupCount == 0) { + if (val != null) { + NullableIntVector int4 = (NullableIntVector) holder.getValueVector(); + NullableIntVector.Mutator m = int4.getMutator(); + m.set(index, (Integer) val); + } + } else { + if (val == null) { + throw new UnsupportedOperationException("Nullable repeated int is not supported."); + } + + RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector(); + RepeatedIntVector.Mutator m = repeatedInt4.getMutator(); + m.add(index, (Integer) val); } - return holder.hasEnoughSpace(32 + 1); + + return holder.hasEnoughSpace(32); } case FLOAT4: { - holder.incAndCheckLength(32 + 1); - NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector(); - NullableFloat4Vector.Mutator m = float4.getMutator(); - if (val != null) { - m.set(index, (Float) val); + holder.incAndCheckLength(32); + if (groupCount == 0) { + if (val != null) { + NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector(); + NullableFloat4Vector.Mutator m = float4.getMutator(); + m.set(index, (Float) val); + } + } else { + if (val == null) { + throw new UnsupportedOperationException("Nullable repeated float is not supported."); + } + + RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector(); + RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator(); + m.add(groupCount, (Float) val); } - return holder.hasEnoughSpace(32 + 1); + return holder.hasEnoughSpace(32); } case VARCHAR: { if (val == null) { @@ -369,10 +394,16 @@ public class JSONRecordReader implements RecordReader { byte[] bytes = ((String) val).getBytes(UTF_8); int length = bytes.length; holder.incAndCheckLength(length); - NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector(); - NullableVarCharVector.Mutator m = varLen4.getMutator(); - m.set(index, bytes); - return holder.hasEnoughSpace(length + 4 + 1); + if (groupCount == 0) { + NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector(); + NullableVarCharVector.Mutator m = varLen4.getMutator(); + m.set(index, bytes); + } else { + RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector(); + RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator(); + m.add(index, bytes); + } + return holder.hasEnoughSpace(length); } } case BIT: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java index d594b9e..43d3cd9 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java @@ -18,58 +18,68 @@ package org.apache.drill.exec.store; -import org.apache.drill.exec.vector.FixedWidthVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.drill.exec.vector.*; public class VectorHolder { - private int length; - private ValueVector vector; - private ValueVector.Mutator mutator; - private int currentLength; + private int count; + private int groupCount; + private int length; + private ValueVector vector; + private int currentLength; - VectorHolder(int length, ValueVector vector) { - this.length = length; - this.vector = vector; - this.mutator = vector.getMutator(); - } + VectorHolder(int length, ValueVector vector) { + this.length = length; + this.vector = vector; + } - public ValueVector getValueVector() { - return vector; - } + public ValueVector getValueVector() { + return vector; + } - public void incAndCheckLength(int newLength) { - if (!hasEnoughSpace(newLength)) { - throw new BatchExceededException(length, currentLength + newLength); - } - currentLength += newLength; + public void incAndCheckLength(int newLength) { + if (!hasEnoughSpace(newLength)) { + throw new BatchExceededException(length, currentLength + newLength); } + count += 1; + currentLength += newLength; + } - public boolean hasEnoughSpace(int newLength) { - return length >= currentLength + newLength; - } + public void setGroupCount(int groupCount) { + this.groupCount = groupCount; + } - public int getLength() { - return length; - } + public boolean hasEnoughSpace(int newLength) { + return length >= currentLength + newLength; + } - public void reset() { - currentLength = 0; - allocateNew(length); - - } - - public void allocateNew(int valueLength){ - if(vector instanceof FixedWidthVector){ - ((FixedWidthVector)vector).allocateNew(valueLength); - }else if(vector instanceof VariableWidthVector){ - ((VariableWidthVector)vector).allocateNew(valueLength * 10, valueLength); - }else{ - throw new UnsupportedOperationException(); - } + public int getLength() { + return length; + } + + public void reset() { + currentLength = 0; + count = 0; + allocateNew(length); + } + + public void populateVectorLength() { + ValueVector.Mutator mutator = vector.getMutator(); + if(mutator instanceof NonRepeatedMutator) { + ((NonRepeatedMutator)mutator).setValueCount(count); + } else if(mutator instanceof RepeatedMutator) { + ((RepeatedMutator)mutator).setGroupAndValueCount(groupCount, count); + } else { + throw new UnsupportedOperationException("Mutator not supported: " + mutator.getClass().getName()); } - - public ValueVector.Mutator getMutator(){ - return mutator; + } + + public void allocateNew(int valueLength) { + if (vector instanceof FixedWidthVector) { + ((FixedWidthVector) vector).allocateNew(valueLength); + } else if (vector instanceof VariableWidthVector) { + ((VariableWidthVector) vector).allocateNew(valueLength * 10, valueLength); + } else { + throw new UnsupportedOperationException(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java index 17e072b..e5d7a30 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java @@ -7,7 +7,6 @@ public interface FixedWidthVector extends ValueVector{ /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * - * @param totalBytes Desired size of the underlying data buffer. * @param valueCount Number of values in the vector. */ public void allocateNew(int valueCount); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java new file mode 100644 index 0000000..1227d02 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.vector; + +public interface RepeatedMutator extends ValueVector.Mutator { + public void setGroupAndValueCount(int groupCount, int valueCount); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java index e21289c..5d4e700 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java @@ -17,13 +17,19 @@ ******************************************************************************/ package org.apache.drill.exec.physical.impl; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.Charset; import java.util.List; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.SchemaDefProtos; import org.apache.drill.exec.proto.UserProtos.QueryType; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -31,14 +37,19 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.vector.ValueVector; +import org.junit.Assert; import org.junit.Test; +import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.google.common.base.Charsets; import com.google.common.io.Files; public class TestSimpleFragmentRun extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleFragmentRun.class); + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + @Test public void runNoExchangeFragment() throws Exception { try(RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73fad99a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json new file mode 100644 index 0000000..91eb80c --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json @@ -0,0 +1,23 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"json-scan", + entries:[ + {url: "file:////home/tnachen/src/incubator-drill/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_1.json"} + ] + }, + { + @id: 2, + child: 1, + pop: "screen" + } + ] +} \ No newline at end of file
