[
https://issues.apache.org/jira/browse/DRILL-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497344#comment-16497344
]
ASF GitHub Bot commented on DRILL-4190:
---------------------------------------
ilooner closed pull request #315: DRILL-4190 Don't hold on to batches from left
side of merge join.
URL: https://github.com/apache/drill/pull/315
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 9ef5cde109..10d0f200d7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -113,7 +113,7 @@ protected MergeJoinBatch(MergeJoinPOP popConfig,
FragmentContext context, Record
throw new UnsupportedOperationException("Merge Join currently does not
support cartesian join. This join operator was configured with 0 conditions");
}
this.left = left;
- this.leftIterator = new RecordIterator(left, this, oContext, 0);
+ this.leftIterator = new RecordIterator(left, this, oContext, 0, false);
this.right = right;
this.rightIterator = new RecordIterator(right, this, oContext, 1);
this.joinType = popConfig.getJoinType();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index af0a753047..918a8daa49 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -25,11 +25,7 @@
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeMap;
/**
@@ -54,6 +50,7 @@
private boolean lastBatchRead; // True if all batches are consumed.
private boolean initialized;
private OperatorContext oContext;
+ private final boolean enableMarkAndReset;
private final VectorContainer container; // Holds VectorContainer of current
record batch
private final TreeRangeMap<Long, RecordBatchData> batches =
TreeRangeMap.create();
@@ -62,6 +59,14 @@ public RecordIterator(RecordBatch incoming,
AbstractRecordBatch<?> outgoing,
OperatorContext oContext,
int inputIndex) {
+ this(incoming, outgoing, oContext, inputIndex, true);
+ }
+
+ public RecordIterator(RecordBatch incoming,
+ AbstractRecordBatch<?> outgoing,
+ OperatorContext oContext,
+ int inputIndex,
+ boolean enableMarkAndReset) {
this.incoming = incoming;
this.outgoing = outgoing;
this.inputIndex = inputIndex;
@@ -70,6 +75,7 @@ public RecordIterator(RecordBatch incoming,
this.oContext = oContext;
resetIndices();
this.initialized = false;
+ this.enableMarkAndReset = enableMarkAndReset;
}
private void resetIndices() {
@@ -88,14 +94,17 @@ private void nextBatch() {
if (lastBatchRead) {
return;
}
- lastOutcome = outgoing.next(inputIndex, incoming);
+ lastOutcome = outgoing != null ? outgoing.next(inputIndex, incoming) :
incoming.next();
}
public void mark() {
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for
this RecordIterator");
+ }
// Release all batches before current batch. [0 to startBatchPosition).
final Map<Range<Long>,RecordBatchData> oldBatches =
batches.subRangeMap(Range.closedOpen(0l, startBatchPosition)).asMapOfRanges();
- for (Range<Long> range : oldBatches.keySet()) {
- oldBatches.get(range).clear();
+ for (RecordBatchData rbd : oldBatches.values()) {
+ rbd.clear();
}
batches.remove(Range.closedOpen(0l, startBatchPosition));
markedInnerPosition = innerPosition;
@@ -103,12 +112,15 @@ public void mark() {
}
public void reset() {
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for
this RecordIterator");
+ }
if (markedOuterPosition >= 0) {
// Move to rbd for markedOuterPosition.
final RecordBatchData rbdNew = batches.get(markedOuterPosition);
final RecordBatchData rbdOld = batches.get(startBatchPosition);
- Preconditions.checkArgument(rbdOld != null);
- Preconditions.checkArgument(rbdNew != null);
+ assert rbdOld != null;
+ assert rbdNew != null;
if (rbdNew != rbdOld) {
container.transferOut(rbdOld.getContainer());
container.transferIn(rbdNew.getContainer());
@@ -125,13 +137,16 @@ public void reset() {
// Move forward by delta (may cross one or more record batches)
public void forward(long delta) {
- Preconditions.checkArgument(delta >= 0);
- Preconditions.checkArgument(delta + outerPosition < totalRecordCount);
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for
this RecordIterator");
+ }
+ assert delta >= 0;
+ assert (delta + outerPosition) < totalRecordCount;
final long nextOuterPosition = delta + outerPosition;
final RecordBatchData rbdNew = batches.get(nextOuterPosition);
final RecordBatchData rbdOld = batches.get(outerPosition);
- Preconditions.checkArgument(rbdNew != null);
- Preconditions.checkArgument(rbdOld != null);
+ assert rbdNew != null;
+ assert rbdOld != null;
container.transferOut(rbdOld.getContainer());
// Get vectors from new position.
container.transferIn(rbdNew.getContainer());
@@ -172,6 +187,9 @@ public IterOutcome next() {
// No more data, disallow reads unless reset is called.
outerPosition = nextOuterPosition;
lastBatchRead = true;
+ if (!enableMarkAndReset) {
+ container.clear();
+ }
break;
case OK_NEW_SCHEMA:
case OK:
@@ -193,14 +211,19 @@ public IterOutcome next() {
initialized = true;
}
if (innerRecordCount > 0) {
- // Transfer vectors back to old batch.
- if (startBatchPosition != -1 && batches.get(startBatchPosition) !=
null) {
- container.transferOut(batches.get(outerPosition).getContainer());
+ if (enableMarkAndReset) {
+ // Transfer vectors back to old batch.
+ if (startBatchPosition != -1 && batches.get(startBatchPosition)
!= null) {
+
container.transferOut(batches.get(outerPosition).getContainer());
+ }
+ container.transferIn(rbd.getContainer());
+ batches.put(Range.closedOpen(nextOuterPosition,
nextOuterPosition + innerRecordCount), rbd);
+ } else {
+ container.zeroVectors();
+ container.transferIn(rbd.getContainer());
}
- container.transferIn(rbd.getContainer());
- startBatchPosition = nextOuterPosition;
- batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition
+ innerRecordCount), rbd);
innerPosition = 0;
+ startBatchPosition = nextOuterPosition;
outerPosition = nextOuterPosition;
totalRecordCount += innerRecordCount;
} else {
@@ -216,12 +239,13 @@ public IterOutcome next() {
}
} else {
if (nextInnerPosition >= innerRecordCount) {
+ assert enableMarkAndReset;
// move to next batch
final RecordBatchData rbdNew = batches.get(nextOuterPosition);
final RecordBatchData rbdOld = batches.get(outerPosition);
- Preconditions.checkArgument(rbdNew != null);
- Preconditions.checkArgument(rbdOld != null);
- Preconditions.checkArgument(rbdOld != rbdNew);
+ assert rbdNew != null;
+ assert rbdOld != null;
+ assert rbdOld != rbdNew;
container.transferOut(rbdOld.getContainer());
container.transferIn(rbdNew.getContainer());
innerPosition = 0;
@@ -257,40 +281,44 @@ public long getOuterPosition() {
}
public int getCurrentPosition() {
- Preconditions.checkArgument(initialized);
- Preconditions.checkArgument(innerPosition >= 0 && innerPosition <
innerRecordCount,
- String.format("innerPosition:%d, outerPosition:%d, innerRecordCount:%d,
totalRecordCount:%d",
- innerPosition, outerPosition, innerRecordCount, totalRecordCount));
+ assert initialized;
+ assert innerPosition >= 0;
+ assert innerPosition < innerRecordCount;
return innerPosition;
}
+ // Test purposes only.
+ public Map<Range<Long>, RecordBatchData> cachedBatches() {
+ return batches.asMapOfRanges();
+ }
+
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getValueAccessorById(clazz, ids);
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getValueVectorId(path);
}
@Override
public BatchSchema getSchema() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getSchema();
}
@Override
public int getRecordCount() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return innerRecordCount;
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.iterator();
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
new file mode 100644
index 0000000000..f892f0dc97
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.compile.CodeCompilerTestFactory;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import java.util.List;
+
+public class TestRecordIterator extends PopUnitTestBase {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(TestRecordIterator.class);
+ DrillConfig c = DrillConfig.create();
+
+ @Test
+ public void testSimpleIterator(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection
connection) throws Throwable{
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry();
+ bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+ bitContext.getOperatorCreatorRegistry(); result = new
OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+ bitContext.getConfig(); result = c;
+ bitContext.getCompiler(); result =
CodeCompilerTestFactory.getTestCompiler(c);
+ }};
+
+ final PhysicalPlanReader reader =
PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+ final String planStr =
Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"),
Charsets.UTF_8);
+
+ final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+ final FunctionImplementationRegistry registry = new
FunctionImplementationRegistry(c);
+ final FragmentContext context = new FragmentContext(bitContext,
BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+ final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context,
(FragmentRoot) operatorList.iterator().next()));
+
+ RecordBatch singleBatch = exec.getIncoming();
+ PhysicalOperator dummyPop = operatorList.iterator().next();
+ OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(),
UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(dummyPop));
+ OperatorStats stats = exec.getContext().getStats().newOperatorStats(def,
exec.getContext().getAllocator());
+ RecordIterator iter = new RecordIterator(singleBatch, null,
exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
+ int totalRecords = 0;
+ List<ValueVector> vectors = null;
+
+ while (true) {
+ iter.next();
+ if (iter.finished()) {
+ break;
+ } else {
+ // First time save vectors.
+ if (vectors == null) {
+ vectors = Lists.newArrayList();
+ for (VectorWrapper vw : iter) {
+ vectors.add(vw.getValueVector());
+ }
+ }
+ final int position = iter.getCurrentPosition();
+ if (position %2 == 0 ) {
+ assertTrue(checkValues(vectors, position));
+ } else {
+ assertTrue(checkValues(vectors, position));
+ }
+ totalRecords++;
+ }
+ assertEquals(0, iter.cachedBatches().size());
+ }
+ assertEquals(11112, totalRecords);
+ try {
+ iter.mark();
+ assertTrue(false);
+ } catch (UnsupportedOperationException e) {}
+ try {
+ iter.reset();
+ assertTrue(false);
+ } catch (UnsupportedOperationException e) {}
+ }
+
+ @Test
+ public void testMarkResetIterator(@Injectable final DrillbitContext
bitContext,
+ @Injectable UserServer.UserClientConnection
connection) throws Throwable{
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry();
+ bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+ bitContext.getOperatorCreatorRegistry(); result = new
OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+ bitContext.getConfig(); result = c;
+ bitContext.getCompiler(); result =
CodeCompilerTestFactory.getTestCompiler(c);
+ }};
+
+ final PhysicalPlanReader reader =
PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+ final String planStr =
Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"),
Charsets.UTF_8);
+
+ final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+ final FunctionImplementationRegistry registry = new
FunctionImplementationRegistry(c);
+ final FragmentContext context = new FragmentContext(bitContext,
BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+ final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context,
(FragmentRoot) operatorList.iterator().next()));
+
+ RecordBatch singleBatch = exec.getIncoming();
+ PhysicalOperator dummyPop = operatorList.iterator().next();
+ OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(),
UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(dummyPop));
+ OperatorStats stats = exec.getContext().getStats().newOperatorStats(def,
exec.getContext().getAllocator());
+ RecordIterator iter = new RecordIterator(singleBatch, null,
exec.getContext().newOperatorContext(dummyPop, stats), 0);
+ List<ValueVector> vectors = null;
+ // batche sizes
+ // 1, 100, 10, 10000, 1, 1000
+ // total = 11112
+
+ // BATCH 1 : 1, starting outerposition: 0
+ iter.next();
+ assertFalse(iter.finished());
+ assertEquals(1, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(0, iter.getOuterPosition());
+ assertEquals(1, iter.cachedBatches().size());
+ vectors = Lists.newArrayList();
+ for (VectorWrapper vw : iter) {
+ vectors.add(vw.getValueVector());
+ }
+ // mark at position 0
+ iter.mark();
+ checkValues(vectors, 0);
+
+ // BATCH 2: 100, starting outerposition: 1
+ iter.next();
+ assertFalse(iter.finished());
+ assertEquals(101, iter.getTotalRecordCount(), 101);
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(100, iter.getInnerRecordCount());
+ assertEquals(1, iter.getOuterPosition());
+ assertEquals(2, iter.cachedBatches().size());
+ for (int i = 0; i < 100; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 3 :10, starting outerposition: 101
+ assertFalse(iter.finished());
+ assertEquals(111, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(10, iter.getInnerRecordCount());
+ assertEquals(101, iter.getOuterPosition());
+ assertEquals(3, iter.cachedBatches().size());
+ for (int i = 0; i < 10; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 4 : 10000, starting outerposition: 111
+ assertFalse(iter.finished());
+ assertEquals(10111, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition(), 0);
+ assertEquals(10000, iter.getInnerRecordCount());
+ assertEquals(111, iter.getOuterPosition());
+ assertEquals(4, iter.cachedBatches().size());
+ for (int i = 0; i < 10000; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 5 : 1, starting outerposition: 10111
+ assertFalse(iter.finished());
+ assertEquals(10112, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(1, iter.getInnerRecordCount());
+ assertEquals(10111, iter.getOuterPosition());
+ assertEquals(5, iter.cachedBatches().size());
+ checkValues(vectors, 0);
+ iter.next();
+
+ // BATCH 6 : 1000, starting outerposition: 10112
+ assertFalse(iter.finished());
+ assertEquals(11112, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ assertEquals(10112, iter.getOuterPosition());
+ assertEquals(6, iter.cachedBatches().size());
+ for (int i = 0; i < 1000; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+ assertTrue(iter.finished());
+ assertEquals(6, iter.cachedBatches().size());
+
+ // back to batch 1
+ iter.reset();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(6, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+ checkValues(vectors, 0);
+
+ iter.next();
+ // mark start of batch 2
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(5, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(100, iter.getInnerRecordCount());
+ for (int i = 0; i < 100; i++) {
+ iter.next();
+ }
+
+ // mark start of batch 3
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(4, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(10, iter.getInnerRecordCount());
+ for (int i = 0; i < 10; i++) {
+ iter.next();
+ }
+
+ // jump into middle of largest batch #4.
+ for (int i = 0; i < 5000; i++) {
+ iter.next();
+ }
+ assertEquals(4, iter.cachedBatches().size());
+ iter.mark();
+ assertEquals(3, iter.cachedBatches().size());
+ for (int i = 0; i < 5000; i++) {
+ iter.next();
+ }
+
+ // mark start of batch 5
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(2, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+
+ // move to last batch
+ iter.next();
+ // skip to the middle of last batch
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ checkValues(vectors, 499);
+ checkValues(vectors, 500);
+ iter.reset();
+ checkValues(vectors, 0);
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(2, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+ // move to last batch
+ iter.next();
+ assertEquals(0, iter.getCurrentPosition());
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ // This should free 5th batch.
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(1, iter.cachedBatches().size());
+ assertEquals(500, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ // go to the end of iterator
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ assertTrue(iter.finished());
+ iter.reset();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(1, iter.cachedBatches().size());
+ assertEquals(500, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ iter.close();
+ assertEquals(0, iter.cachedBatches().size());
+ }
+
+ private static boolean checkValues(List<ValueVector> vectors, int position) {
+ boolean result = true;
+ final int expected = (position % 2 == 0)? Integer.MIN_VALUE :
Integer.MAX_VALUE;
+ for (ValueVector vv : vectors) {
+ final Object o = vv.getAccessor().getObject(position);
+ if (o instanceof Integer) {
+ final Integer v = (Integer)o;
+ result &= (v == expected);
+ } else {
+ System.out.println(String.format("Found wrong type %s at position %d",
o.getClass(), position));
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+}
diff --git a/exec/java-exec/src/test/resources/record/test_recorditerator.json
b/exec/java-exec/src/test/resources/record/test_recorditerator.json
new file mode 100644
index 0000000000..057466b833
--- /dev/null
+++ b/exec/java-exec/src/test/resources/record/test_recorditerator.json
@@ -0,0 +1,71 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph: [
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {
+ records: 1,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 100,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 10,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 10000,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 1,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 1000,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ }
+ ]
+ },
+ {
+ @id:2,
+ child:1,
+ pop:"project",
+ exprs:[ { ref : "`*`", expr : "`*`"} ]
+ },
+ {
+ @id:3,
+ child:2,
+ pop:"screen"
+ }
+ ]
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Don't hold on to batches from left side of merge join
> -----------------------------------------------------
>
> Key: DRILL-4190
> URL: https://issues.apache.org/jira/browse/DRILL-4190
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Relational Operators
> Affects Versions: 1.3.0, 1.4.0, 1.5.0
> Reporter: Victoria Markman
> Assignee: amit hadke
> Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: 2990f5f8-ec64-1223-c1d8-97dd7e601cee.sys.drill,
> exception.log, query3.sql
>
>
> TPCDS queries with the latest 1.4.0 release when hash join is disabled:
> 22 queries fail with out of memory
> 2 wrong results (I did not validate the nature of wrong result yet)
> Only query97.sql is a legitimate failure: we don't support full outer join
> with the merge join.
> It is important to understand what has changed between 1.2.0 and 1.4.0 that
> made these tests not runnable with the same configuration.
> Same tests with the same drill configuration pass in 1.2.0 release.
> (I hope I did not make a mistake somewhere in my cluster setup :))
> {code}
> 0: jdbc:drill:schema=dfs> select * from sys.version;
> +-----------------+-------------------------------------------+---------------------------------------------------------------------+----------------------------+--------------+----------------------------+
> | version | commit_id |
> commit_message | commit_time
> | build_email | build_time |
> +-----------------+-------------------------------------------+---------------------------------------------------------------------+----------------------------+--------------+----------------------------+
> | 1.4.0-SNAPSHOT | b9068117177c3b47025f52c00f67938e0c3e4732 | DRILL-4165
> Add a precondition for size of merge join record batch. | 08.12.2015 @
> 01:25:34 UTC | Unknown | 08.12.2015 @ 03:36:25 UTC |
> +-----------------+-------------------------------------------+---------------------------------------------------------------------+----------------------------+--------------+----------------------------+
> 1 row selected (2.211 seconds)
> Execution Failures:
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query50.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query33.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query74.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query68.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query34.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query21.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query46.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query91.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query59.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query3.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query66.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query84.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query97.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query19.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query96.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query43.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query15.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query2.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query60.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query79.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query73.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query45.sql
> Verification Failures
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query52.sql
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query40.sql
> Timeout Failures
> ----------------------------------------------------------------------------------------------------------------
> Passing tests: 3
> Execution Failures: 22
> VerificationFailures: 2
> Timeouts: 0
> Canceled: 0
> {code}
> {code}
> 0: jdbc:drill:schema=dfs> select * from sys.version;
> +-----------+----------------+-------------+-------------+------------+
> | commit_id | commit_message | commit_time | build_email | build_time |
> +-----------+----------------+-------------+-------------+------------+
> | f1100a79b4e4fbb1b58b35b0230edff137588777 | DRILL-3947: Use setSafe() for
> date, time, timestamp types while populating pruning vector (other types were
> already using setSafe). | 19.10.2015 @ 16:02:00 UTC | Unknown | 19.10.2015 @
> 16:25:21 UTC |
> +-----------+----------------+-------------+-------------+------------+
> 1 row selected (2.79 seconds)
> PASS (1.543 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query68.sql
> (connection: 1681915178)
> PASS (29.36 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query34.sql
> (connection: 1681915178)
> PASS (3.311 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query21.sql
> (connection: 1681915178)
> PASS (1.447 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query46.sql
> (connection: 1681915178)
> PASS (34.53 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query76.sql
> (connection: 1681915178)
> PASS (47.13 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query91.sql
> (connection: 1681915178)
> PASS (1.151 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query59.sql
> (connection: 1681915178)
> PASS (32.29 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query3.sql
> (connection: 1681915178)
> PASS (1.939 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query66.sql
> (connection: 1681915178)
> PASS (19.26 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query84.sql
> (connection: 1681915178)
> PASS (1.243 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query40.sql
> (connection: 1681915178)
> [#37] Query failed:
> oadd.org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> IllegalArgumentException: Full outer join not currently supported
> [Error Id: 9a400ac2-3f1d-428c-9dc6-5f556cb520aa on atsqa4-133.qa.lab:31010]
> at
> oadd.org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:118)
> at
> oadd.org.apache.drill.exec.rpc.user.UserClient.handleReponse(UserClient.java:110)
> at
> oadd.org.apache.drill.exec.rpc.BasicClientWithConnection.handle(BasicClientWithConnection.java:47)
> at
> oadd.org.apache.drill.exec.rpc.BasicClientWithConnection.handle(BasicClientWithConnection.java:32)
> at oadd.org.apache.drill.exec.rpc.RpcBus.handle(RpcBus.java:61)
> at
> oadd.org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:233)
> at
> oadd.org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:205)
> at
> oadd.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> oadd.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> oadd.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> oadd.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> oadd.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> oadd.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> oadd.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
> oadd.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> oadd.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> oadd.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> oadd.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at oadd.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> oadd.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> EXECUTION_FAILURE (2.814 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query97.sql
> (connection: 1681915178)
> PASS (57.04 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query19.sql
> (connection: 1681915178)
> PASS (24.01 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query96.sql
> (connection: 1681915178)
> PASS (28.77 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query43.sql
> (connection: 1681915178)
> PASS (1.833 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query93.sql
> (connection: 1681915178)
> PASS (38.84 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query15.sql
> (connection: 1681915178)
> PASS (55.82 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query2.sql
> (connection: 1681915178)
> PASS (1.308 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query60.sql
> (connection: 1681915178)
> PASS (1.116 min)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query79.sql
> (connection: 1681915178)
> PASS (27.79 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query73.sql
> (connection: 1681915178)
> PASS (39.85 s)
> /root/drill-tests-new/framework/resources/Advanced/tpcds/tpcds_sf100/original/query45.sql
> (connection: 1681915178)
> {code}
> *Cluster configuration:*
> - 4 nodes
> - 48 GB direct memory
> - 10GB memory allocated to sort
> - timeout setup for the framework = 600 seconds
> - queries were executed one at a time
> *System settings:*
> {code}
> 0: jdbc:drill:schema=dfs> select * from sys.options where status like
> '%CHANGED%';
> +-------------------------------------------+----------+---------+----------+--------------+-------------+-----------+------------+
> | name | kind | type | status |
> num_val | string_val | bool_val | float_val |
> +-------------------------------------------+----------+---------+----------+--------------+-------------+-----------+------------+
> | planner.enable_decimal_data_type | BOOLEAN | SYSTEM | CHANGED |
> null | null | true | null |
> | planner.enable_hashjoin | BOOLEAN | SYSTEM | CHANGED |
> null | null | false | null |
> | planner.memory.max_query_memory_per_node | LONG | SYSTEM | CHANGED |
> 10737418240 | null | null | null |
> +-------------------------------------------+----------+---------+----------+--------------+-------------+-----------+------------+
> 3 rows selected (3.464 seconds)
> {code}
> TPCDS queries that were executed from the public test framework:
> ./run.sh -s Advanced/tpcds/tpcds_sf100/original -g smoke -t 600
> More details shortly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)