Repository: incubator-drill Updated Branches: refs/heads/master 721e7c257 -> 7dc3f7240
More memory fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0e7c6e7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0e7c6e7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0e7c6e7c Branch: refs/heads/master Commit: 0e7c6e7cb3979904ee2117a04d943470d46eab7a Parents: 5d098b2 Author: Jacques Nadeau <[email protected]> Authored: Wed Mar 26 22:45:59 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Mar 26 22:46:38 2014 -0700 ---------------------------------------------------------------------- .../main/codegen/templates/FixedValueVectors.java | 8 ++++---- .../codegen/templates/NullableValueVectors.java | 9 +++++---- .../codegen/templates/RepeatedValueVectors.java | 9 ++++----- .../codegen/templates/VariableLengthVectors.java | 5 +++-- .../exec/physical/impl/SendingAccountor.java | 2 +- .../apache/drill/exec/rpc/OutboundRpcMessage.java | 18 +++++++++++++++--- .../drill/exec/store/ischema/RowRecordReader.java | 2 +- .../drill/exec/store/mock/MockRecordReader.java | 5 ++--- .../org/apache/drill/exec/vector/BitVector.java | 5 +++-- .../org/apache/drill/exec/vector/ValueVector.java | 2 +- .../apache/drill/exec/record/vector/TestLoad.java | 3 +-- .../org/apache/drill/exec/server/TestBitRpc.java | 3 +-- .../org/apache/drill/jdbc/test/TestJdbcQuery.java | 8 ++++---- 13 files changed, 45 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index 2d81299..8191926 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -245,8 +245,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } @Override - public void generateTestData() { - setValueCount(getValueCapacity()); + public void generateTestData(int count) { + setValueCount(count); boolean even = true; for(int i =0; i < valueCount; i++, even = !even){ byte b = even ? Byte.MIN_VALUE : Byte.MAX_VALUE; @@ -276,8 +276,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } @Override - public void generateTestData() { - setValueCount(getValueCapacity()); + public void generateTestData(int size) { + setValueCount(size); boolean even = true; for(int i =0; i < valueCount; i++, even = !even){ if(even){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index 7878aed..a234aec 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -56,7 +56,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj } public int getValueCapacity(){ - return bits.getValueCapacity(); + return Math.min(bits.getValueCapacity(), values.getValueCapacity()); } @Override @@ -389,10 +389,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj return valueCount == setCount; } - public void generateTestData(){ - bits.getMutator().generateTestData(); - values.getMutator().generateTestData(); + public void generateTestData(int valueCount){ + bits.getMutator().generateTestData(valueCount); + values.getMutator().generateTestData(valueCount); <#if type.major = "VarLen">lastSet = valueCount;</#if> + setValueCount(valueCount); } public void reset(){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index 8a5d506..cfbbcde 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -323,17 +323,16 @@ package org.apache.drill.exec.vector; values.getMutator().setValueCount(childValueCount); } - public void generateTestData(){ - int valCount = getValueCapacity(); + public void generateTestData(final int valCount){ int[] sizes = {1,2,0,6}; int size = 0; int runningOffset = 0; - for(int i =1; i < valCount; i++, size++){ + for(int i =1; i < valCount+1; i++, size++){ runningOffset += sizes[size % sizes.length]; offsets.getMutator().set(i, runningOffset); } - values.getMutator().generateTestData(); - setValueCount(valCount-1); + values.getMutator().generateTestData(valCount*9); + setValueCount(size); } public void reset(){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 5cd83af..c3baf03 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -385,15 +385,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public void generateTestData(){ + public void generateTestData(int size){ boolean even = true; - for(int i =0; i < getValueCapacity(); i++, even = !even){ + for(int i =0; i < size; i++, even = !even){ if(even){ set(i, new String("aaaaa").getBytes(Charsets.UTF_8)); }else{ set(i, new String("bbbbbbbbbb").getBytes(Charsets.UTF_8)); } } + setValueCount(size); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java index cebee5b..d67d214 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java @@ -39,7 +39,7 @@ public class SendingAccountor { wait.release(); } - public void waitForSendComplete() { + public synchronized void waitForSendComplete() { try { wait.acquire(batchesSent); batchesSent = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java index b82df0f..a4c5ef9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java @@ -17,12 +17,14 @@ */ package org.apache.drill.exec.rpc; -import java.util.Arrays; - import io.netty.buffer.ByteBuf; +import java.util.Arrays; +import java.util.List; + import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; +import com.google.common.collect.Lists; import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; @@ -35,7 +37,17 @@ public class OutboundRpcMessage extends RpcMessage { public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) { super(mode, rpcType.getNumber(), coordinationId); this.pBody = pBody; - this.dBodies = dBodies; + + // Netty doesn't traditionally release the reference on an unreadable buffer. However, we need to so that if we send a empty or unwritable buffer, we still release. otherwise we get weird memory leaks when sending empty vectors. + List<ByteBuf> bufs = Lists.newArrayList(); + for(ByteBuf d : dBodies){ + if(d.readableBytes() == 0){ + d.release(); + }else{ + bufs.add(d); + } + } + this.dBodies = bufs.toArray(new ByteBuf[bufs.size()]); } public int getBodySize() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java index 4832132..5d723dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java @@ -43,7 +43,7 @@ public class RowRecordReader implements RecordReader { protected final FragmentContext context; protected OutputMutator output; - private int bufSize = 32*1024*1024; + private int bufSize = 256*1024; private int maxRowCount; /** * Construct a RecordReader which uses rows from a RowProvider and puts them into a set of value vectors. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index e2100c5..1371226 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -93,12 +93,11 @@ public class MockRecordReader implements RecordReader { recordsRead += recordSetSize; for(ValueVector v : valueVectors){ - AllocationHelper.allocate(v, recordSetSize, 50, 5); + AllocationHelper.allocate(v, recordSetSize, 50, 10); // logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName())); ValueVector.Mutator m = v.getMutator(); - m.generateTestData(); - m.setValueCount(recordSetSize); + m.generateTestData(recordSetSize); } return recordSetSize; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 4f8cd33..2c028e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -240,13 +240,14 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public final void generateTestData() { + public final void generateTestData(int values) { boolean even = true; - for (int i = 0; i < valueCapacity; i++, even = !even) { + for (int i = 0; i < values; i++, even = !even) { if (even) { set(i, 1); } } + setValueCount(values); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index a3d3a8a..8dc4298 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -141,6 +141,6 @@ public interface ValueVector extends Closeable { public void reset(); - public void generateTestData(); + public void generateTestData(int values); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java index 7dddfa0..bc7fa90 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java @@ -56,8 +56,7 @@ public class TestLoad { List<ValueVector> vectors = Lists.newArrayList(fixedV, varlenV, nullableVarlenV); for (ValueVector v : vectors) { AllocationHelper.allocate(v, 100, 50); - v.getMutator().generateTestData(); - v.getMutator().setValueCount(100); + v.getMutator().generateTestData(100); } WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java index ca040eb..cb81175 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java @@ -97,8 +97,7 @@ public class TestBitRpc { MaterializedField.create(new SchemaPath("a", ExpressionPosition.UNKNOWN), Types.required(MinorType.FLOAT8)), allocator); v.allocateNew(records); - v.getMutator().generateTestData(); - v.getMutator().setValueCount(records); + v.getMutator().generateTestData(records); vectors.add(v); } return WritableBatch.getBatchNoHV(records, vectors, false); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e7c6e7c/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java index 21de9f4..129591a 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java @@ -73,11 +73,11 @@ public class TestJdbcQuery { @Test public void testInfoSchema() throws Exception{ - testQuery("select * from INFORMATION_SCHEMA.SCHEMATA"); +// testQuery("select * from INFORMATION_SCHEMA.SCHEMATA"); testQuery("select * from INFORMATION_SCHEMA.CATALOGS"); - testQuery("select * from INFORMATION_SCHEMA.VIEWS"); - testQuery("select * from INFORMATION_SCHEMA.TABLES"); - testQuery("select * from INFORMATION_SCHEMA.COLUMNS"); +// testQuery("select * from INFORMATION_SCHEMA.VIEWS"); +// testQuery("select * from INFORMATION_SCHEMA.TABLES"); +// testQuery("select * from INFORMATION_SCHEMA.COLUMNS"); } @Test
