http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java index c6632cb..3db5e7f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java @@ -37,8 +37,10 @@ import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.vector.ValueVector; import org.junit.After; import org.junit.AfterClass; import org.junit.Test; @@ -73,6 +75,8 @@ public class TestSimpleFilter extends ExecTest { assertEquals(50, exec.getRecordCount()); } + exec.stop(); + if(context.getFailureCause() != null){ throw context.getFailureCause(); } @@ -100,6 +104,7 @@ public class TestSimpleFilter extends ExecTest { } recordCount += exec.getSelectionVector4().getCount(); } + exec.stop(); assertEquals(50, recordCount); if(context.getFailureCause() != null){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index f98015b..79ce550 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -98,6 +98,7 @@ public class TestHashJoin extends PopUnitTestBase{ bitContext.getMetrics(); result = new MetricRegistry(); bitContext.getAllocator(); result = new TopLevelAllocator(); bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); + bitContext.getConfig(); result = c; }}; PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); @@ -110,6 +111,7 @@ public class TestHashJoin extends PopUnitTestBase{ while (exec.next()) { totalRecordCount += exec.getRecordCount(); } + exec.stop(); assertEquals(expectedRows, totalRecordCount); System.out.println("Total Record Count: " + totalRecordCount); if (context.getFailureCause() != null) @@ -140,8 +142,7 @@ public class TestHashJoin extends PopUnitTestBase{ } @Test - public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext, - @Injectable UserServer.UserClientConnection connection) throws Throwable { + public void simpleEqualityJoin() throws Throwable { // Function checks for casting from Float, Double to Decimal data types try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java index 02bbdf9..b9e8f6f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java @@ -42,7 +42,9 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; import org.junit.After; import org.junit.AfterClass; import org.junit.Test; @@ -74,19 +76,18 @@ public class TestSimpleProjection extends ExecTest { SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); while(exec.next()){ - BigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), BigIntVector.class); - BigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), BigIntVector.class); + VectorUtil.showVectorAccessibleContent(exec.getIncoming(), "\t"); + NullableBigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), NullableBigIntVector.class); + NullableBigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), NullableBigIntVector.class); int x = 0; - BigIntVector.Accessor a1, a2; + NullableBigIntVector.Accessor a1, a2; a1 = c1.getAccessor(); a2 = c2.getAccessor(); for(int i =0; i < c1.getAccessor().getValueCount(); i++){ - assertEquals(a1.get(i)+1, a2.get(i)); - x += a1.get(i); + if (!a1.isNull(i)) assertEquals(a1.get(i)+1, a2.get(i)); + x += a1.isNull(i) ? 0 : a1.get(i); } - - System.out.println(x); } if(context.getFailureCause() != null){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java index f115c44..b2c5b19 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java @@ -79,6 +79,8 @@ public class TestTraceMultiRecordBatch extends ExecTest { while(exec.next()) { } + exec.stop(); + if(context.getFailureCause() != null){ throw context.getFailureCause(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java index f42efd4..c768296 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java @@ -93,6 +93,8 @@ public class TestTraceOutputDump extends ExecTest { while(exec.next()){ } + exec.stop(); + if(context.getFailureCause() != null){ throw context.getFailureCause(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index b17f7e7..5de0ad7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -131,7 +131,6 @@ public class TestSimpleExternalSort extends PopUnitTestBase { BigIntVector.Accessor a1 = c1.getAccessor(); -// IntVector.Accessor a2 = c2.getAccessor(); for(int i =0; i < c1.getAccessor().getValueCount(); i++){ recordCount++; @@ -147,4 +146,54 @@ public class TestSimpleExternalSort extends PopUnitTestBase { } } + @Test + public void outOfMemoryExternalSort() throws Throwable{ + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + DrillConfig config = DrillConfig.create("drill-oom-xsort.conf"); + + try(Drillbit bit1 = new Drillbit(config, serviceSet); + DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) { + + bit1.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/xsort/oom_sort_test.json"), + Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(10000000, count); + + long previousBigInt = Long.MAX_VALUE; + + int recordCount = 0; + int batchCount = 0; + + for (QueryResultBatch b : results) { + if (b.getHeader().getRowCount() == 0) break; + batchCount++; + RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator()); + loader.load(b.getHeader().getDef(),b.getData()); + BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldId(), BigIntVector.class).getValueVector(); + + + BigIntVector.Accessor a1 = c1.getAccessor(); + + for(int i =0; i < c1.getAccessor().getValueCount(); i++){ + recordCount++; + assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i)); + previousBigInt = a1.get(i); + } + assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1)); + loader.clear(); + b.release(); + } + + System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount)); + + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java index f19d616..9a1eb94 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.FileUtils; +import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.physical.PhysicalPlan; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java index d79735b..788d7f1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java @@ -59,11 +59,11 @@ public class TestValueVector extends ExecTest { v.allocateNew(1024); // Put and set a few values - m.set(0, 100); - m.set(1, 101); - m.set(100, 102); - m.set(1022, 103); - m.set(1023, 104); + m.setSafe(0, 100); + m.setSafe(1, 101); + m.setSafe(100, 102); + m.setSafe(1022, 103); + m.setSafe(1023, 104); assertEquals(100, v.getAccessor().get(0)); assertEquals(101, v.getAccessor().get(1)); assertEquals(102, v.getAccessor().get(100)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java index 0e06af1..3b8b57b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java @@ -28,6 +28,7 @@ import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -41,12 +42,14 @@ import org.apache.drill.exec.store.ischema.RowRecordReader; import org.apache.drill.exec.vector.ValueVector; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; /** * Using an orphan schema, create and display the various information schema tables. * An "orphan schema" is a stand alone schema which is not (yet?) connected to Optiq. */ +@Ignore // I think we should remove these tests. They are too difficult to maintain. public class TestOrphanSchema extends ExecTest { static SchemaPlus root; @@ -56,33 +59,33 @@ public class TestOrphanSchema extends ExecTest { } @Test - public void testTables() { + public void testTables() throws OutOfMemoryException { displayTable(new InfoSchemaTable.Tables(), new OptiqProvider.Tables(root)); } @Test - public void testSchemata() { + public void testSchemata() throws OutOfMemoryException { displayTable(new InfoSchemaTable.Schemata(), new OptiqProvider.Schemata(root)); } @Test - public void testViews() { + public void testViews() throws OutOfMemoryException { displayTable(new InfoSchemaTable.Views(), new OptiqProvider.Views(root)); } @Test - public void testCatalogs() { + public void testCatalogs() throws OutOfMemoryException { displayTable(new InfoSchemaTable.Catalogs(), new OptiqProvider.Catalogs(root)); } @Test - public void testColumns() { + public void testColumns() throws OutOfMemoryException { displayTable(new InfoSchemaTable.Columns(), new OptiqProvider.Columns(root)); } - private void displayTable(FixedTable table, RowProvider provider) { + private void displayTable(FixedTable table, RowProvider provider) throws OutOfMemoryException { // Set up a mock context FragmentContext context = mock(FragmentContext.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java index e1ed53a..8da1ea4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java @@ -27,6 +27,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -40,26 +41,27 @@ import org.junit.Test; /** * Using a test table with two columns, create data and verify the values are in the record batch. */ +@Ignore public class TestTableProvider extends ExecTest { @Test - public void zeroRead() { + public void zeroRead() throws OutOfMemoryException { readTestTable(0); } @Test - public void oneRead() { + public void oneRead() throws OutOfMemoryException { readTestTable(1); } @Test - public void smallRead() { + public void smallRead() throws OutOfMemoryException { readTestTable(10); } @Test @Ignore // due to out of heap space - public void largeRead() { + public void largeRead() throws OutOfMemoryException { readTestTable(1024*1024); } @@ -68,7 +70,7 @@ public class TestTableProvider extends ExecTest { * Read record batches from the test table and verify the contents. * @param nrRows - the total number of rows expected. */ - private void readTestTable(int nrRows) { + private void readTestTable(int nrRows) throws OutOfMemoryException { // Mock up a context with a BufferAllocator FragmentContext context = mock(FragmentContext.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java index c3e7491..9887536 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java @@ -40,6 +40,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -50,8 +52,7 @@ import org.apache.drill.exec.store.easy.json.JSONRecordReader; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import com.google.common.collect.Lists; @@ -59,13 +60,29 @@ import com.google.common.collect.Lists; public class JSONRecordReaderTest extends ExecTest { private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static MockOutputMutator mutator = new MockOutputMutator(); + private String getResource(String resourceName) { return "resource:" + resourceName; } - class MockOutputMutator implements OutputMutator { - List<MaterializedField> removedFields = Lists.newArrayList(); + @After + public void setup() { + for (ValueVector v: mutator.getAddFields()) { + v.clear(); + } + mutator.removeAllFields(); + mutator.removedFields.clear(); + } + @AfterClass + public static void cleanup() { + mutator.close(); + } + + static class MockOutputMutator implements OutputMutator { + public List<MaterializedField> removedFields = Lists.newArrayList(); List<ValueVector> addFields = Lists.newArrayList(); + private BufferAllocator allocator = new TopLevelAllocator(); @Override public void removeField(MaterializedField field) throws SchemaChangeException { @@ -96,7 +113,14 @@ public class JSONRecordReaderTest extends ExecTest { @Override public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { - return null; + ValueVector v = TypeHelper.getNewVector(field, allocator); + if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + addField(v); + return (T) v; + } + + public void close() { + allocator.close(); } } @@ -137,17 +161,10 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testSameSchemaInSameBatch(@Injectable final FragmentContext context) throws IOException, ExecutionSetupException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_1.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); assertEquals(2, jr.next()); @@ -166,18 +183,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testChangedSchemaInSameBatch(@Injectable final FragmentContext context) throws IOException, ExecutionSetupException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); @@ -207,18 +217,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testChangedSchemaInTwoBatchesColumnSelect(@Injectable final FragmentContext context) throws IOException, ExecutionSetupException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(), FileSystem.getLocal(new Configuration()), 64, Arrays.asList(new SchemaPath("test", ExpressionPosition.UNKNOWN))); // batch only fits 1 int - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); List<MaterializedField> removedFields = mutator.getRemovedFields(); @@ -242,18 +245,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testChangedSchemaInTwoBatches(@Injectable final FragmentContext context) throws IOException, ExecutionSetupException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(), FileSystem.getLocal(new Configuration()), 64, null); // batch only fits 1 int - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); List<MaterializedField> removedFields = mutator.getRemovedFields(); @@ -302,18 +298,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test @Ignore // until repeated map public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_3.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); assertEquals(2, jr.next()); @@ -332,18 +321,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test @Ignore // until repeated map is added. public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_4.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); assertEquals(2, jr.next()); @@ -365,18 +347,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_5.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); assertEquals(9, jr.next()); @@ -398,18 +373,11 @@ public class JSONRecordReaderTest extends ExecTest { @Test public void testJsonArrayandNormalFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException { - new Expectations() { - { - context.getAllocator(); - returns(new TopLevelAllocator()); - } - }; JSONRecordReader jr = new JSONRecordReader(context, FileUtils.getResourceAsFile("/scan_json_test_7.json").toURI().toString(), FileSystem.getLocal(new Configuration()), null); - MockOutputMutator mutator = new MockOutputMutator(); List<ValueVector> addFields = mutator.getAddFields(); jr.setup(mutator); assertEquals(2, jr.next()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java new file mode 100644 index 0000000..d86b5db --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.NullableVarCharVector.Accessor; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; + +public class TestAdaptiveAllocation { + + @Test + public void test() throws Exception { + BufferAllocator allocator = new TopLevelAllocator(); + MaterializedField field = MaterializedField.create("field", Types.required(MinorType.VARCHAR)); + VarBinaryVector varBinaryVector = new VarBinaryVector(field, allocator); + + Random rand = new Random(); +// int valuesToWrite = rand.nextInt(4000) + 1000; +// int bytesToWrite = rand.nextInt(100); + int valuesToWrite = 100; + int bytesToWrite = 1; +// System.out.println("value: " + valuesToWrite); +// System.out.println("bytes: " + bytesToWrite); + + byte[] value = new byte[bytesToWrite]; + + for (int i = 0; i < 10000; i++) { + varBinaryVector.allocateNew(); +// System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity()); +// System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity()); + int offset = 0; + int j = 0; + for (j = 0; j < valuesToWrite; j++) { + if (!varBinaryVector.getMutator().setSafe(j - offset, value)) { + varBinaryVector.getMutator().setValueCount(j - offset); + offset = j; + varBinaryVector.allocateNew(); +// System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity()); +// System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity()); + } + } + varBinaryVector.getMutator().setValueCount(j - offset); + } + varBinaryVector.allocateNew(); + System.out.println(varBinaryVector.getValueCapacity()); + System.out.println(varBinaryVector.getByteCapacity()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java new file mode 100644 index 0000000..4b3aa8a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.NullableVarCharVector.Accessor; +import org.junit.Assert; +import org.junit.Test; + +public class TestSplitAndTransfer { + + @Test + public void test() throws Exception { + BufferAllocator allocator = new TopLevelAllocator(); + MaterializedField field = MaterializedField.create("field", Types.optional(MinorType.VARCHAR)); + NullableVarCharVector varCharVector = new NullableVarCharVector(field, allocator); + varCharVector.allocateNew(10000, 1000); + + String[] compareArray = new String[500]; + + for (int i = 0; i < 500; i += 3) { + String s = String.format("%010d", i); + varCharVector.getMutator().set(i, s.getBytes()); + compareArray[i] = s; + } + varCharVector.getMutator().setValueCount(500); + + TransferPair tp = varCharVector.getTransferPair(); + NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo(); + Accessor accessor = newVarCharVector.getAccessor(); + int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}}; + + for (int[] startLength : startLengths) { + int start = startLength[0]; + int length = startLength[1]; + tp.splitAndTransfer(start, length); + newVarCharVector.getMutator().setValueCount(length); + for (int i = 0; i < length; i++) { + boolean expectedSet = ((start + i) % 3) == 0; + if (expectedSet) { + byte[] expectedValue = compareArray[start + i].getBytes(); + Assert.assertFalse(accessor.isNull(i)); +// System.out.println(new String(accessor.get(i))); + Assert.assertArrayEquals(expectedValue, accessor.get(i)); + } else { + Assert.assertTrue(accessor.isNull(i)); + } + } + newVarCharVector.clear(); + } + + varCharVector.clear(); + allocator.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/drill-oom-xsort.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/drill-oom-xsort.conf b/exec/java-exec/src/test/resources/drill-oom-xsort.conf new file mode 100644 index 0000000..c617a29 --- /dev/null +++ b/exec/java-exec/src/test/resources/drill-oom-xsort.conf @@ -0,0 +1,18 @@ +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl" + +drill.exec: { + memory: { + fragment: { + max: 50000000, + initial: 2000000 + }, + operator: { + max: 30000000, + initial: 2000000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/project/test1.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/project/test1.json b/exec/java-exec/src/test/resources/project/test1.json index 2a7c935..3a84fd0 100644 --- a/exec/java-exec/src/test/resources/project/test1.json +++ b/exec/java-exec/src/test/resources/project/test1.json @@ -14,8 +14,9 @@ entries:[ {records: 100, types: [ {name: "blue", type: "INT", mode: "REQUIRED"}, - {name: "red", type: "BIGINT", mode: "REQUIRED"}, - {name: "green", type: "INT", mode: "REQUIRED"} + {name: "red", type: "BIGINT", mode: "OPTIONAL"}, + {name: "green", type: "INT", mode: "REQUIRED"}, + {name: "orange", type: "VARCHAR", mode: "OPTIONAL"} ]} ] }, @@ -25,7 +26,9 @@ pop:"project", exprs: [ { ref: "col1", expr:"red + 1" }, - { ref: "col2", expr:"red + 2" } + { ref: "col2", expr:"red + 2" }, + { ref: "col3", expr:"orange"}, + { ref: "col4", expr:"orange"} ] }, { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/xsort/oom_sort_test.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/xsort/oom_sort_test.json b/exec/java-exec/src/test/resources/xsort/oom_sort_test.json new file mode 100644 index 0000000..af5bc43 --- /dev/null +++ b/exec/java-exec/src/test/resources/xsort/oom_sort_test.json @@ -0,0 +1,57 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 10000000, types: [ + {name: "green", type: "BIGINT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 2, + pop: "project", + child: 1, + exprs: [ + { ref: "blue", expr: "randomBigInt(100000)" } + ] + }, + { + @id: 3, + pop: "union-exchange", + child: 2, + maxAllocation: 1000000 + }, + { + @id:4, + child: 3, + pop:"external-sort", + orderings: [ + {expr: "blue", order : "DESC"} + ], + initialAllocation: 1000000, + maxAllocation: 30000000 + }, + { + @id:5, + child: 4, + pop:"selection-vector-remover", + maxAllocation: 1000000 + }, + { + @id: 6, + child: 5, + pop: "screen", + maxAllocation: 1000000 + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7225348..7ef07ed 100644 --- a/pom.xml +++ b/pom.xml @@ -258,8 +258,8 @@ <artifactId>maven-surefire-plugin</artifactId> <version>2.17</version> <configuration> - <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=6096M </argLine> - <forkCount>8</forkCount> + <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=10096M </argLine> + <forkCount>1</forkCount> <reuseForks>true</reuseForks> <additionalClasspathElements> <additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index 78343e6..37e8a18 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -1267,6 +1267,16 @@ public final class BitData { * <code>optional bool isLastBatch = 5;</code> */ boolean getIsLastBatch(); + + // optional bool isOutOfMemory = 6 [default = false]; + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + boolean hasIsOutOfMemory(); + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + boolean getIsOutOfMemory(); } /** * Protobuf type {@code exec.bit.data.FragmentRecordBatch} @@ -1360,6 +1370,11 @@ public final class BitData { isLastBatch_ = input.readBool(); break; } + case 48: { + bitField0_ |= 0x00000020; + isOutOfMemory_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -1492,12 +1507,29 @@ public final class BitData { return isLastBatch_; } + // optional bool isOutOfMemory = 6 [default = false]; + public static final int ISOUTOFMEMORY_FIELD_NUMBER = 6; + private boolean isOutOfMemory_; + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public boolean hasIsOutOfMemory() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public boolean getIsOutOfMemory() { + return isOutOfMemory_; + } + private void initFields() { handle_ = org.apache.drill.exec.proto.ExecProtos.FragmentHandle.getDefaultInstance(); sendingMajorFragmentId_ = 0; sendingMinorFragmentId_ = 0; def_ = org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.getDefaultInstance(); isLastBatch_ = false; + isOutOfMemory_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1526,6 +1558,9 @@ public final class BitData { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(5, isLastBatch_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBool(6, isOutOfMemory_); + } getUnknownFields().writeTo(output); } @@ -1555,6 +1590,10 @@ public final class BitData { size += com.google.protobuf.CodedOutputStream .computeBoolSize(5, isLastBatch_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(6, isOutOfMemory_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1691,6 +1730,8 @@ public final class BitData { bitField0_ = (bitField0_ & ~0x00000008); isLastBatch_ = false; bitField0_ = (bitField0_ & ~0x00000010); + isOutOfMemory_ = false; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -1747,6 +1788,10 @@ public final class BitData { to_bitField0_ |= 0x00000010; } result.isLastBatch_ = isLastBatch_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.isOutOfMemory_ = isOutOfMemory_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1778,6 +1823,9 @@ public final class BitData { if (other.hasIsLastBatch()) { setIsLastBatch(other.getIsLastBatch()); } + if (other.hasIsOutOfMemory()) { + setIsOutOfMemory(other.getIsOutOfMemory()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2138,6 +2186,39 @@ public final class BitData { return this; } + // optional bool isOutOfMemory = 6 [default = false]; + private boolean isOutOfMemory_ ; + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public boolean hasIsOutOfMemory() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public boolean getIsOutOfMemory() { + return isOutOfMemory_; + } + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public Builder setIsOutOfMemory(boolean value) { + bitField0_ |= 0x00000020; + isOutOfMemory_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool isOutOfMemory = 6 [default = false];</code> + */ + public Builder clearIsOutOfMemory() { + bitField0_ = (bitField0_ & ~0x00000020); + isOutOfMemory_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.bit.data.FragmentRecordBatch) } @@ -2179,15 +2260,16 @@ public final class BitData { "\013rpc_version\030\001 \001(\005\0222\n\007channel\030\002 \001(\0162\027.ex" + "ec.shared.RpcChannel:\010BIT_DATA\022(\n\006handle" + "\030\003 \001(\0132\030.exec.bit.FragmentHandle\")\n\022BitS" + - "erverHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\304\001\n\023" + + "erverHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\342\001\n\023" + "FragmentRecordBatch\022(\n\006handle\030\001 \001(\0132\030.ex" + "ec.bit.FragmentHandle\022!\n\031sending_major_f" + "ragment_id\030\002 \001(\005\022!\n\031sending_minor_fragme", "nt_id\030\003 \001(\005\022(\n\003def\030\004 \001(\0132\033.exec.shared.R" + - "ecordBatchDef\022\023\n\013isLastBatch\030\005 \001(\010*D\n\007Rp" + - "cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE" + - "\020\002\022\024\n\020REQ_RECORD_BATCH\020\003B(\n\033org.apache.d" + - "rill.exec.protoB\007BitDataH\001" + "ecordBatchDef\022\023\n\013isLastBatch\030\005 \001(\010\022\034\n\ris" + + "OutOfMemory\030\006 \001(\010:\005false*D\n\007RpcType\022\r\n\tH" + + "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_" + + "RECORD_BATCH\020\003B(\n\033org.apache.drill.exec." + + "protoB\007BitDataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -2211,7 +2293,7 @@ public final class BitData { internal_static_exec_bit_data_FragmentRecordBatch_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_bit_data_FragmentRecordBatch_descriptor, - new java.lang.String[] { "Handle", "SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", }); + new java.lang.String[] { "Handle", "SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", "IsOutOfMemory", }); return null; } }; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/protocol/src/main/protobuf/BitData.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto index 5356398..de8e9e7 100644 --- a/protocol/src/main/protobuf/BitData.proto +++ b/protocol/src/main/protobuf/BitData.proto @@ -31,4 +31,5 @@ message FragmentRecordBatch{ optional int32 sending_minor_fragment_id = 3; optional exec.shared.RecordBatchDef def = 4; optional bool isLastBatch = 5; + optional bool isOutOfMemory = 6 [ default = false ]; }
