http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java new file mode 100644 index 0000000..fe16b3a --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package org.apache.drill.exec.store.json; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.physical.base.SubScan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Iterators; + +@JsonTypeName("json-sub-scan") +public class JSONSubScan extends AbstractBase implements SubScan{ + + protected final List<JSONGroupScan.ScanEntry> readEntries; + private final OperatorCost cost; + private final Size size; + + @JsonCreator + public JSONSubScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) { + this.readEntries = readEntries; + OperatorCost cost = new OperatorCost(0,0,0,0); + Size size = new Size(0,0); + for(JSONGroupScan.ScanEntry r : readEntries){ + cost = cost.add(r.getCost()); + size = size.add(r.getSize()); + } + this.cost = cost; + this.size = size; + } + + public List<JSONGroupScan.ScanEntry> getReadEntries() { + return readEntries; + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + return new JSONSubScan(readEntries); + } + + @Override + public OperatorCost getCost() { + return cost; + } + + @Override + public Size getSize() { + return size; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.emptyIterator(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java new file mode 100644 index 0000000..d5f1d8f --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java @@ -0,0 +1,221 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.mock; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.ReadEntry; +import org.apache.drill.exec.physical.base.*; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.vector.TypeHelper; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("mock-scan") +public class MockGroupScanPOP extends AbstractGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class); + + private final String url; + protected final List<MockScanEntry> readEntries; + private final OperatorCost cost; + private final Size size; + private LinkedList<MockScanEntry>[] mappings; + + @JsonCreator + public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) { + this.readEntries = readEntries; + OperatorCost cost = new OperatorCost(0,0,0,0); + Size size = new Size(0,0); + for(MockScanEntry r : readEntries){ + cost = cost.add(r.getCost()); + size = size.add(r.getSize()); + } + this.cost = cost; + this.size = size; + this.url = url; + } + + public String getUrl() { + return url; + } + + @JsonProperty("entries") + public List<MockScanEntry> getReadEntries() { + return readEntries; + } + + public static class MockScanEntry implements ReadEntry { + + private final int records; + private final MockColumn[] types; + private final int recordSize; + + + @JsonCreator + public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) { + this.records = records; + this.types = types; + int size = 0; + for(MockColumn dt : types){ + size += TypeHelper.getSize(dt.getMajorType()); + } + this.recordSize = size; + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(1, 2, 1, 1); + } + + public int getRecords() { + return records; + } + + public MockColumn[] getTypes() { + return types; + } + + @Override + public Size getSize() { + return new Size(records, recordSize); + } + } + + @JsonInclude(Include.NON_NULL) + public static class MockColumn{ + @JsonProperty("type") public MinorType minorType; + public String name; + public DataMode mode; + public Integer width; + public Integer precision; + public Integer scale; + + + @JsonCreator + public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) { + this.name = name; + this.minorType = minorType; + this.mode = mode; + this.width = width; + this.precision = precision; + this.scale = scale; + } + + @JsonProperty("type") + public MinorType getMinorType() { + return minorType; + } + public String getName() { + return name; + } + public DataMode getMode() { + return mode; + } + public Integer getWidth() { + return width; + } + public Integer getPrecision() { + return precision; + } + public Integer getScale() { + return scale; + } + + @JsonIgnore + public MajorType getMajorType(){ + MajorType.Builder b = MajorType.newBuilder(); + b.setMode(mode); + b.setMinorType(minorType); + if(precision != null) b.setPrecision(precision); + if(width != null) b.setWidth(width); + if(scale != null) b.setScale(scale); + return b.build(); + } + + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + return Collections.emptyList(); + } + + @SuppressWarnings("unchecked") + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) { + Preconditions.checkArgument(endpoints.size() <= getReadEntries().size()); + + mappings = new LinkedList[endpoints.size()]; + + int i =0; + for(MockScanEntry e : this.getReadEntries()){ + if(i == endpoints.size()) i -= endpoints.size(); + LinkedList<MockScanEntry> entries = mappings[i]; + if(entries == null){ + entries = new LinkedList<MockScanEntry>(); + mappings[i] = entries; + } + entries.add(e); + i++; + } + } + + @Override + public SubScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId); + return new MockSubScanPOP(url, mappings[minorFragmentId]); + } + + @Override + public int getMaxParallelizationWidth() { + return readEntries.size(); + } + + @Override + public OperatorCost getCost() { + return cost; + } + + @Override + public Size getSize() { + return size; + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new MockGroupScanPOP(url, readEntries); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java new file mode 100644 index 0000000..024aa21 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.mock; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn; +import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.TypeHelper; +import org.apache.drill.exec.vector.ValueVector; + +public class MockRecordReader implements RecordReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class); + + private OutputMutator output; + private MockScanEntry config; + private FragmentContext context; + private ValueVector[] valueVectors; + private int recordsRead; + private int batchRecordCount; + + public MockRecordReader(FragmentContext context, MockScanEntry config) { + this.context = context; + this.config = config; + } + + private int getEstimatedRecordSize(MockColumn[] types) { + int x = 0; + for (int i = 0; i < types.length; i++) { + x += TypeHelper.getSize(types[i].getMajorType()); + } + return x; + } + + private ValueVector getVector(String name, MajorType type, int length) { + assert context != null : "Context shouldn't be null."; + MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type); + ValueVector v; + v = TypeHelper.getNewVector(f, context.getAllocator()); + AllocationHelper.allocate(v, length, 50, 4); + + return v; + + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + try { + this.output = output; + int estimateRowSize = getEstimatedRecordSize(config.getTypes()); + valueVectors = new ValueVector[config.getTypes().length]; + batchRecordCount = 250000 / estimateRowSize; + + for (int i = 0; i < config.getTypes().length; i++) { + valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount); + output.addField(valueVectors[i]); + } + output.setNewSchema(); + } catch (SchemaChangeException e) { + throw new ExecutionSetupException("Failure while setting up fields", e); + } + + } + + @Override + public int next() { + + int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead); + + recordsRead += recordSetSize; + for(ValueVector v : valueVectors){ + AllocationHelper.allocate(v, recordSetSize, 50, 5); + + logger.debug("MockRecordReader: Generating random data for VV of type " + v.getClass().getName()); + ValueVector.Mutator m = v.getMutator(); + m.setValueCount(recordSetSize); + m.generateTestData(); + + } + return recordSetSize; + } + + @Override + public void cleanup() { + for (int i = 0; i < valueVectors.length; i++) { + try { + output.removeField(valueVectors[i].getField()); + } catch (SchemaChangeException e) { + logger.warn("Failure while trying to remove field.", e); + } + valueVectors[i].close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java new file mode 100644 index 0000000..5c91e1c --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.mock; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List<MockScanEntry> entries = config.getReadEntries(); + List<RecordReader> readers = Lists.newArrayList(); + for(MockScanEntry e : entries){ + readers.add(new MockRecordReader(context, e)); + } + return new ScanBatch(context, readers.iterator()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java index 23ac2b8..1ea6958 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java @@ -22,10 +22,9 @@ import java.util.ArrayList; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.physical.config.MockGroupScanPOP; -import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStorageEngine; +import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry; import org.apache.drill.storage.MockStorageEngineConfig; import com.fasterxml.jackson.core.type.TypeReference; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java new file mode 100644 index 0000000..4dbcd63 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java @@ -0,0 +1,75 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.mock; + +import java.util.Collections; +import java.util.List; + +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractStore; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("mock-store") +public class MockStorePOP extends AbstractStore { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class); + + @JsonCreator + public MockStorePOP(@JsonProperty("child") PhysicalOperator child) { + super(child); + } + + public int getMaxWidth() { + return 1; + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + return Collections.emptyList(); + } + + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) { + + } + + @Override + public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) { + return new MockStorePOP(child); + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new MockStorePOP(child); + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java new file mode 100644 index 0000000..38bf337 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java @@ -0,0 +1,115 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.mock; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import com.google.common.collect.Iterators; +import org.apache.drill.common.graph.GraphVisitor; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.ReadEntry; +import org.apache.drill.exec.physical.base.*; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.vector.TypeHelper; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("mock-sub-scan") +public class MockSubScanPOP extends AbstractBase implements SubScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class); + + private final String url; + protected final List<MockGroupScanPOP.MockScanEntry> readEntries; + private final OperatorCost cost; + private final Size size; + private LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings; + + @JsonCreator + public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) { + this.readEntries = readEntries; + OperatorCost cost = new OperatorCost(0,0,0,0); + Size size = new Size(0,0); + for(MockGroupScanPOP.MockScanEntry r : readEntries){ + cost = cost.add(r.getCost()); + size = size.add(r.getSize()); + } + this.cost = cost; + this.size = size; + this.url = url; + } + + public String getUrl() { + return url; + } + + @JsonProperty("entries") + public List<MockGroupScanPOP.MockScanEntry> getReadEntries() { + return readEntries; + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.emptyIterator(); + } + + @Override + public OperatorCost getCost() { + throw new UnsupportedOperationException(); + } + + @Override + public Size getSize() { + throw new UnsupportedOperationException(); + } + + // will want to replace these two methods with an interface above for AbstractSubScan + @Override + public boolean isExecutable() { + return true; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{ + return physicalVisitor.visitSubScan(this, value); + } + // see comment above about replacing this + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new MockSubScanPOP(url, readEntries); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java index 8b4f760..99b65e6 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java @@ -62,7 +62,7 @@ public abstract class ColumnReader { ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v){ this.parentReader = parentReader; - if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, (BaseDataValueVector) v); + if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, v); else valueVecHolder = new VectorHolder(5000, (BaseDataValueVector) v); columnDescriptor = descriptor; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index 29d9cc7..0378960 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -36,7 +36,10 @@ public final class PageReadStatus { // store references to the pages that have been uncompressed, but not copied to ValueVectors yet Page currentPage; // buffer to store bytes of current page, set to max size of parquet page - byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE]; + // TODO: add this back once toByteArray accepts an input. byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE]; + byte[] pageDataByteArray; + + PageReader pageReader; // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData long readPosInBytes; @@ -103,11 +106,13 @@ public final class PageReadStatus { } // if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space - if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) { - pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100]; - } +// if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) { +// pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100]; +// } // TODO - would like to get this into the mainline, hopefully before alpha - currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength); + pageDataByteArray = currentPage.getBytes().toByteArray(); + //TODO: Fix once parquet supports buffer work or at least passing in array. + //pageDataByteArray = currentPage.getBytes().toByteArray(pageDataByteArray, 0, byteLength); readPosInBytes = 0; valuesRead = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index f4988a0..66c1550 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.drill.common.JSONOptions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.exception.SetupException; @@ -36,15 +37,17 @@ import org.apache.drill.exec.physical.ReadEntryWithPath; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; -import org.apache.drill.exec.physical.config.MockGroupScanPOP; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.base.Preconditions; + import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.StorageEngineRegistry; import org.apache.drill.exec.store.AffinityCalculator; +import org.apache.drill.exec.store.mock.MockGroupScanPOP; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index bd63406..03fb4ec 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -24,17 +24,18 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.MockScanBatchCreator; - import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.mock.MockScanBatchCreator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java index c9d6967..2d9524d 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java @@ -11,6 +11,7 @@ import java.util.List; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + import mockit.Expectations; import mockit.Injectable; @@ -23,6 +24,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.proto.SchemaDefProtos; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.json.JSONRecordReader; import org.apache.drill.exec.vector.ValueVector; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 0e31cdd..5628f50 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Vector; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.types.TypeProtos; @@ -72,40 +71,30 @@ public class ParquetRecordReaderTest { new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent(); } + @Test public void testMultipleRowGroupsAndReadsEvent() throws Exception { String planName = "/parquet_scan_screen.json"; - String fileName = "/tmp/testParquetFile_many_types_3"; + String fileName = "/tmp/parquet_test_file_many_types"; int numberRowGroups = 20; int recordsPerRowGroup = 300000; - //TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup); + File f = new File(fileName); + if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup); testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup); } private class ParquetResultListener implements UserResultsListener { - private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); private SettableFuture<Void> future = SettableFuture.create(); - int count = 0; RecordBatchLoader batchLoader; - byte[] bytes; - int numberRowGroups; - int numberOfTimesRead; int batchCounter = 1; - int columnValCounter = 0; - int i = 0; - private FieldInfo currentField; private final HashMap<String, Long> valuesChecked = new HashMap<>(); - private final int recordsPerRowGroup; private final Map<String, FieldInfo> fields; private final long totalRecords; ParquetResultListener(int recordsPerRowGroup, RecordBatchLoader batchLoader, int numberRowGroups, int numberOfTimesRead){ this.batchLoader = batchLoader; this.fields = TestFileGenerator.getFieldMap(recordsPerRowGroup); - this.recordsPerRowGroup = recordsPerRowGroup; - this.numberRowGroups = numberRowGroups; - this.numberOfTimesRead = numberOfTimesRead; this.totalRecords = recordsPerRowGroup * numberRowGroups * numberOfTimesRead; } @@ -120,7 +109,7 @@ public class ParquetRecordReaderTest { long columnValCounter = 0; int i = 0; FieldInfo currentField; - count += result.getHeader().getRowCount(); + boolean schemaChanged = false; try { schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData()); @@ -128,12 +117,11 @@ public class ParquetRecordReaderTest { logger.error("Failure while loading batch", e); } - int recordCount = 0; // print headers. if (schemaChanged) { } // do not believe any change is needed for when the schema changes, with the current mock scan use case - for (VectorWrapper vw : batchLoader) { + for (VectorWrapper<?> vw : batchLoader) { ValueVector vv = vw.getValueVector(); currentField = fields.get(vv.getField().getName()); if (VERBOSE_DEBUG){ @@ -163,7 +151,6 @@ public class ParquetRecordReaderTest { if (VERBOSE_DEBUG){ for (i = 0; i < batchLoader.getRecordCount(); i++) { - recordCount++; if (i % 50 == 0){ System.out.println(); for (VectorWrapper<?> vw : batchLoader) { @@ -298,11 +285,6 @@ public class ParquetRecordReaderTest { @SuppressWarnings("unchecked") private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) { -// UserBitShared.FieldMetadata metadata = valueVector.getMetadata(); -// SchemaDefProtos.FieldDef def = metadata.getDef(); -// assertEquals(expectedMinorType, def.getMajorType().getMinorType()); -// assertEquals(name, def.getNameList().get(0).getName()); -// assertEquals(parentFieldId, def.getParentId()); if (expectedMinorType == TypeProtos.MinorType.MAP) { return; @@ -339,9 +321,6 @@ public class ParquetRecordReaderTest { assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray()); } - private String getResource(String resourceName) { - return "resource:" + resourceName; - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json index 15d3936..29cab68 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json @@ -22,10 +22,10 @@ storageengine:"parquet", selection: [ { - path: "/tmp/testParquetFile_many_types_3" + path: "/tmp/parquet_test_file_many_types" }, { - path: "/tmp/testParquetFile_many_types_3" + path: "/tmp/parquet_test_file_many_types" } ] }, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java index 79b8ef8..35a2414 100644 --- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java +++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java @@ -3,6 +3,7 @@ package org.apache.drill.jdbc.test; import java.io.IOException; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -12,6 +13,7 @@ import org.junit.rules.Timeout; import com.google.common.base.Charsets; import com.google.common.io.Resources; +@Ignore public class FullEngineTest { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FullEngineTest.class);
