http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java new file mode 100644 index 0000000..178af47 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java @@ -0,0 +1,1143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.VTuple; +import org.junit.Test; + +import java.math.BigInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestUniformRangePartition { + + @Test + public void testPartitionForINT2Asc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT2); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt2((short) 1)); + e.put(0, DatumFactory.createInt2((short) 30000)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForINT2Desc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT2); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt2((short) 30000)); + e.put(0, DatumFactory.createInt2((short) 1)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForINT4Asc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt4(1)); + e.put(0, DatumFactory.createInt4(10000)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForINT4Desc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt4(10000)); + e.put(0, DatumFactory.createInt4(1)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForINT8Asc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt8(1)); + e.put(0, DatumFactory.createInt8(10000)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForInt8Desc() { + Schema schema = new Schema() + .addColumn("col1", Type.INT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createInt8(10000)); + e.put(0, DatumFactory.createInt8(1)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForFloat4Asc() { + Schema schema = new Schema() + .addColumn("col1", Type.FLOAT4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createFloat4((float) 1.0)); + e.put(0, DatumFactory.createFloat4((float) 10000.0)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForFloat4Desc() { + Schema schema = new Schema() + .addColumn("col1", Type.FLOAT4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createFloat4((float) 10000.0)); + e.put(0, DatumFactory.createFloat4((float) 1.0)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForFloat8Asc() { + Schema schema = new Schema() + .addColumn("col1", Type.FLOAT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createFloat8(1.0)); + e.put(0, DatumFactory.createFloat8(10000.0)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForFloat8Desc() { + Schema schema = new Schema() + .addColumn("col1", Type.FLOAT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createFloat8((float) 10000.0)); + e.put(0, DatumFactory.createFloat8((float) 1.0)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + /** + * It verify overflow and increment in normal case. + */ + @Test + public void testIncrementOfText() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT); + + SortSpec[] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createText("A")); + s.put(1, DatumFactory.createText("A")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createText("D")); + e.put(1, DatumFactory.createText("C")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(12, partitioner.getTotalCardinality().intValue()); + + String [] result = new String[12]; + result[0] = "AA"; + result[1] = "AB"; + result[2] = "AC"; + result[3] = "BA"; + result[4] = "BB"; + result[5] = "BC"; + result[6] = "CA"; + result[7] = "CB"; + result[8] = "CC"; + result[9] = "DA"; + result[10] = "DB"; + result[11] = "DC"; + + Tuple end = partitioner.increment(s, BigInteger.valueOf(1), 1); + assertEquals("A", end.getText(0)); + assertEquals("B", end.getText(1)); + for (int i = 2; i < 11; i++ ) { + end = partitioner.increment(end, BigInteger.valueOf(1), 1); + assertEquals(result[i].charAt(0), end.getText(0).charAt(0)); + assertEquals(result[i].charAt(1), end.getText(1).charAt(0)); + } + } + + /** + * It verify overflow with the number that exceeds the last digit. + */ + @Test + public void testIncrementOfText2() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createText("A")); + s.put(1, DatumFactory.createText("A")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createText("D")); + e.put(1, DatumFactory.createText("C")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(12, partitioner.getTotalCardinality().intValue()); + + String [] result = new String[12]; + result[0] = "AA"; + result[1] = "AB"; + result[2] = "AC"; + result[3] = "BA"; + result[4] = "BB"; + result[5] = "BC"; + result[6] = "CA"; + result[7] = "CB"; + result[8] = "CC"; + result[9] = "DA"; + result[10] = "DB"; + result[11] = "DC"; + + Tuple end = partitioner.increment(s, BigInteger.valueOf(6), 1); + assertEquals("C", end.getText(0)); + assertEquals("A", end.getText(1)); + end = partitioner.increment(end, BigInteger.valueOf(5), 1); + assertEquals("D", end.getText(0)); + assertEquals("C", end.getText(1)); + } + + /** + * It verify the case where two or more digits are overflow. + */ + @Test + public void testIncrementOfText3() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT) + .addColumn("final", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(3); + s.put(0, DatumFactory.createText("A")); + s.put(1, DatumFactory.createText("A")); + s.put(2, DatumFactory.createText("A")); + VTuple e = new VTuple(3); + e.put(0, DatumFactory.createText("D")); // 4 + e.put(1, DatumFactory.createText("B")); // 2 + e.put(2, DatumFactory.createText("C")); // x3 = 24 + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(24, partitioner.getTotalCardinality().intValue()); + + Tuple overflowBefore = partitioner.increment(s, BigInteger.valueOf(5), 2); + assertEquals("A", overflowBefore.getText(0)); + assertEquals("B", overflowBefore.getText(1)); + assertEquals("C", overflowBefore.getText(2)); + Tuple overflowed = partitioner.increment(overflowBefore, BigInteger.valueOf(1), 2); + assertEquals("B", overflowed.getText(0)); + assertEquals("A", overflowed.getText(1)); + assertEquals("A", overflowed.getText(2)); + } + + @Test + public void testIncrementOfUnicode() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("ê°ê°ê°")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("ííí")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs); + + Tuple tuple = s; + Tuple prevTuple = null; + for (int i = 0; i < 100; i++) { + tuple = partitioner.increment(tuple, BigInteger.valueOf(30000), 0); + if (prevTuple != null) { + assertTrue("prev=" + prevTuple + ", current=" + tuple, comp.compare(prevTuple, tuple) < 0); + } + prevTuple = tuple; + } + } + + @Test + public void testIncrementOfUnicodeOneCharSinglePartition() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("ê°")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("ë¤")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 1; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testIncrementOfUnicodeOneCharMultiPartition() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("ê°")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("ê½¥")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 8; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeTextAsc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("ê°ê°ê°")); + e.put(0, DatumFactory.createText("ííí")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeDiffLenBeginTextAsc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("ê°")); + e.put(0, DatumFactory.createText("ííí")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeDiffLenEndTextAsc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("ê°ê°ê°")); + e.put(0, DatumFactory.createText("í")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeTextDesc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("ííí")); + e.put(0, DatumFactory.createText("ê°ê°ê°")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeDiffLenBeginTextDesc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("í")); + e.put(0, DatumFactory.createText("ê°ê°ê°")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForUnicodeDiffLenEndTextDesc() { + Schema schema = new Schema() + .addColumn("col1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + VTuple e = new VTuple(1); + s.put(0, DatumFactory.createText("í")); + e.put(0, DatumFactory.createText("ê°ê°ê°")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + int partNum = 64; + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testIncrementOfInt8() { + Schema schema = new Schema() + .addColumn("l_orderkey", Type.INT8) + .addColumn("l_linenumber", Type.INT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createInt8(10)); + s.put(1, DatumFactory.createInt8(20)); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createInt8(19)); + e.put(1, DatumFactory.createInt8(39)); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(200, partitioner.getTotalCardinality().longValue()); + + Tuple range2 = partitioner.increment(s, BigInteger.valueOf(100), 1); + assertEquals(15, range2.getInt4(0)); + assertEquals(20, range2.getInt4(1)); + Tuple range3 = partitioner.increment(range2, BigInteger.valueOf(99), 1); + assertEquals(19, range3.getInt4(0)); + assertEquals(39, range3.getInt4(1)); + } + + @Test public void testIncrementOfInt8AndFinal() { + Schema schema = new Schema() + .addColumn("l_orderkey", Type.INT8) + .addColumn("l_linenumber", Type.INT8) + .addColumn("final", Type.INT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(3); + s.put(0, DatumFactory.createInt8(1)); + s.put(1, DatumFactory.createInt8(1)); + s.put(2, DatumFactory.createInt8(1)); + VTuple e = new VTuple(3); + e.put(0, DatumFactory.createInt8(4)); // 4 + e.put(1, DatumFactory.createInt8(2)); // 2 + e.put(2, DatumFactory.createInt8(3)); //x3 = 24 + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(24, partitioner.getTotalCardinality().longValue()); + + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); + assertEquals(1, beforeOverflow.getInt8(0)); + assertEquals(2, beforeOverflow.getInt8(1)); + assertEquals(3, beforeOverflow.getInt8(2)); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); + assertEquals(2, overflow.getInt8(0)); + assertEquals(1, overflow.getInt8(1)); + assertEquals(1, overflow.getInt8(2)); + } + + @Test + public void testIncrementOfFloat8() { + Schema schema = new Schema() + .addColumn("l_orderkey", Type.FLOAT8) + .addColumn("l_linenumber", Type.FLOAT8) + .addColumn("final", Type.FLOAT8); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(3); + s.put(0, DatumFactory.createFloat8(1.1d)); + s.put(1, DatumFactory.createFloat8(1.1d)); + s.put(2, DatumFactory.createFloat8(1.1d)); + VTuple e = new VTuple(3); + e.put(0, DatumFactory.createFloat8(4.1d)); // 4 + e.put(1, DatumFactory.createFloat8(2.1d)); // 2 + e.put(2, DatumFactory.createFloat8(3.1d)); //x3 = 24 + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(24, partitioner.getTotalCardinality().longValue()); + + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); + assertTrue(1.1d == beforeOverflow.getFloat8(0)); + assertTrue(2.1d == beforeOverflow.getFloat8(1)); + assertTrue(3.1d == beforeOverflow.getFloat8(2)); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); + assertTrue(2.1d == overflow.getFloat8(0)); + assertTrue(1.1d == overflow.getFloat8(1)); + assertTrue(1.1d == overflow.getFloat8(2)); + } + + @Test + public void testIncrementOfInet4() { + Schema schema = new Schema() + .addColumn("l_orderkey", Type.INET4) + .addColumn("l_linenumber", Type.INET4) + .addColumn("final", Type.INET4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(3); + s.put(0, DatumFactory.createInet4("127.0.1.1")); + s.put(1, DatumFactory.createInet4("127.0.0.1")); + s.put(2, DatumFactory.createInet4("128.0.0.253")); + VTuple e = new VTuple(3); + e.put(0, DatumFactory.createInet4("127.0.1.4")); // 4 + e.put(1, DatumFactory.createInet4("127.0.0.2")); // 2 + e.put(2, DatumFactory.createInet4("128.0.0.255")); //x3 = 24 + + TupleRange expected = new TupleRange(sortSpecs, s, e); + + UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); + assertEquals(24, partitioner.getTotalCardinality().longValue()); + + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); + assertTrue("127.0.1.1".equals(beforeOverflow.getText(0))); + assertTrue("127.0.0.2".equals(beforeOverflow.getText(1))); + assertTrue("128.0.0.255".equals(beforeOverflow.getText(2))); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); + assertTrue("127.0.1.2".equals(overflow.getText(0))); + assertTrue("127.0.0.1".equals(overflow.getText(1))); + assertTrue("128.0.0.253".equals(overflow.getText(2))); + } + + @Test + public void testPartition() { + Schema schema = new Schema(); + schema.addColumn("l_returnflag", Type.TEXT); + schema.addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createText("A")); + s.put(1, DatumFactory.createText("F")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createText("R")); + e.put(1, DatumFactory.createText("O")); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner + = new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(31); + + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + } + + @Test + public void testPartitionForOnePartNum() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createText("A")); + s.put(1, DatumFactory.createText("F")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createText("R")); + e.put(1, DatumFactory.createText("O")); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(1); + + assertEquals(expected, ranges[0]); + } + + @Test + public void testPartitionForOnePartNumWithOneOfTheValueNull() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createNullDatum()); + s.put(1, DatumFactory.createText("F")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createText("R")); + e.put(1, DatumFactory.createNullDatum()); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(1); + + assertEquals(expected, ranges[0]); + } + + @Test + public void testPartitionForMultipleChars() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("AAA")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("ZZZ")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(48); + + TupleRange prev = null; + for (int i = 0; i < ranges.length; i++) { + if (prev != null) { + assertTrue(i + "th, prev=" + prev + ",cur=" + ranges[i], prev.compareTo(ranges[i]) < 0); + } + prev = ranges[i]; + } + assertEquals(48, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[47].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleChars2() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("A1")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("A999975")); + + final int partNum = 2; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleChars2Desc() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("A999975")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("A1")); + + final int partNum = 48; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleCharsWithSameFirstChar() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(1); + s.put(0, DatumFactory.createText("AAA")); + VTuple e = new VTuple(1); + e.put(0, DatumFactory.createText("AAZ")); + + final int partNum = 4; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForOnePartNumWithBothValueNull() { + Schema schema = new Schema() + .addColumn("l_returnflag", Type.TEXT) + .addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createNullDatum()); + s.put(1, DatumFactory.createNullDatum()); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createNullDatum()); + e.put(1, DatumFactory.createNullDatum()); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(1); + + assertEquals(expected, ranges[0]); + } + + @Test + public void testPartitionWithNull() { + Schema schema = new Schema(); + schema.addColumn("l_returnflag", Type.TEXT); + schema.addColumn("l_linestatus", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createNullDatum()); + s.put(1, DatumFactory.createText("F")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createNullDatum()); + e.put(1, DatumFactory.createText("O")); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner + = new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(10); + + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + } + + @Test + public void testPartitionWithINET4() { + Schema schema = new Schema(); + schema.addColumn("l_returnflag", Type.INET4); + schema.addColumn("l_linestatus", Type.INET4); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + VTuple s = new VTuple(2); + s.put(0, DatumFactory.createInet4("127.0.1.10")); + s.put(1, DatumFactory.createInet4("127.0.2.10")); + VTuple e = new VTuple(2); + e.put(0, DatumFactory.createInet4("127.0.1.20")); + e.put(1, DatumFactory.createInet4("127.0.2.20")); + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner + = new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(10); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev != null) { + assertTrue(prev.compareTo(r) < 0); + } + prev = r; + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java new file mode 100644 index 0000000..817d27a --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global; + +import org.apache.tajo.LocalTajoTestingUtility; +import org.junit.Test; + +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestMasterPlan { + + @Test + public void testConnect() { + MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); + + ExecutionBlock eb1 = masterPlan.newExecutionBlock(); + ExecutionBlock eb2 = masterPlan.newExecutionBlock(); + ExecutionBlock eb3 = masterPlan.newExecutionBlock(); + + masterPlan.addConnect(eb1, eb2, ShuffleType.RANGE_SHUFFLE); + assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId())); + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId())); + + masterPlan.addConnect(eb3, eb2, ShuffleType.RANGE_SHUFFLE); + assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId())); + assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId())); + + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId())); + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb3.getId())); + + masterPlan.disconnect(eb3, eb2); + assertFalse(masterPlan.isConnected(eb3, eb2)); + assertFalse(masterPlan.isReverseConnected(eb2, eb3)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java new file mode 100644 index 0000000..71bad20 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBNLJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestBNLJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + + private static int OUTER_TUPLE_NUM = 1000; + private static int INNER_TUPLE_NUM = 1000; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("memid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.init(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < OUTER_TUPLE_NUM; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta("TEXT"); + Path peoplePath = new Path(testDir, "people.csv"); + appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < INNER_TUPLE_NUM; i += 2) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + // employee (managerId, empId, memId, deptName) + // people (empId, fk_memId, name, age) + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e, people p", + "select managerId, e.empId, deptName, e.memId from employee as e " + + "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" }; + + @Test + public final void testBNLCrossJoin() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), + expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); + + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), + Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testBNLCrossJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof BNLJoinExec); + + int i = 0; + exec.init(); + while (exec.next() != null) { + i++; + } + exec.close(); + assertEquals(OUTER_TUPLE_NUM * INNER_TUPLE_NUM / 2, i); // expected 10 * 5 + } + + @Test + public final void testBNLInnerJoin() throws IOException, TajoException { + Expr context = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), + context).getRootBlock().getRoot(); + + FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(), + new Path(people.getUri()), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testBNLInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), + merged, workDir); + ctx.setEnforcer(enforcer); + + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof BNLJoinExec); + + Tuple tuple; + int i = 1; + int count = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.getInt4(0)); + assertTrue(i == tuple.getInt4(1)); + assertTrue(("dept_" + i).equals(tuple.getText(2))); + assertTrue(10 + i == tuple.getInt4(3)); + i += 2; + } + exec.close(); + assertEquals(INNER_TUPLE_NUM / 2, count); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java new file mode 100644 index 0000000..4ae880f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestExternalSortExec { + private TajoConf conf; + private TajoTestingCluster util; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestExternalSortExec"; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + + private final int numTuple = 100000; + private Random rnd = new Random(System.currentTimeMillis()); + + private TableDesc employee; + + @Before + public void setUp() throws Exception { + this.conf = new TajoConf(); + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.enableStats(); + appender.init(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < numTuple; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(rnd.nextInt(50)), + DatumFactory.createInt4(rnd.nextInt(100)), + DatumFactory.createText("dept_" + i), + }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); + catalog.createTable(employee); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + } + + @After + public void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(TEST_PATH); + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, empId from employee order by managerId, empId" + }; + + @Test + public final void testNext() throws IOException, TajoException { + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + ProjectionExec proj = (ProjectionExec) exec; + + // TODO - should be planed with user's optimization hint + ExternalSortExec extSort; + if (!(proj.getChild() instanceof ExternalSortExec)) { + UnaryPhysicalExec sortExec = proj.getChild(); + SeqScanExec scan = sortExec.getChild(); + + extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); + proj.setChild(extSort); + } else { + extSort = proj.getChild(); + } + extSort.setSortBufferBytesNum(1024*1024); + + Tuple tuple; + Tuple preVal = null; + Tuple curVal; + int cnt = 0; + exec.init(); + long start = System.currentTimeMillis(); + BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), + new SortSpec[]{ + new SortSpec(new Column("managerid", Type.INT4)), + new SortSpec(new Column("empid", Type.INT4)) + }); + + while ((tuple = exec.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = new VTuple(curVal); + cnt++; + } + long end = System.currentTimeMillis(); + assertEquals(numTuple, cnt); + + // for rescan test + preVal = null; + exec.rescan(); + cnt = 0; + while ((tuple = exec.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + cnt++; + } + assertEquals(numTuple, cnt); + exec.close(); + System.out.println("Sort Time: " + (end - start) + " msc"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java new file mode 100644 index 0000000..5901c49 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.*; + +public class TestFullOuterHashJoinExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuterHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private Path testDir; + private QueryContext defaultContext; + + private TableDesc dep3; + private TableDesc job3; + private TableDesc emp3; + private TableDesc phone3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta("TEXT"); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + VTuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta("TEXT"); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + VTuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta("TEXT"); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + VTuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x= 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + //---------------------phone3 -------------------- + // emp_id | phone_number + // ----------------------------------------------- + // this table is empty, no rows + + Schema phone3Schema = new Schema(); + phone3Schema.addColumn("emp_id", Type.INT4); + phone3Schema.addColumn("phone_number", Type.TEXT); + + + TableMeta phone3Meta = CatalogUtil.newTableMeta("TEXT"); + Path phone3Path = new Path(testDir, "phone3.csv"); + Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(phone3Meta, phone3Schema, phone3Path); + appender5.init(); + + appender5.flush(); + appender5.close(); + phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path); + catalog.createTable(phone3); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + + defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + // [0] no nulls + "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join emp3 on dep3.dep_id = emp3.dep_id", + // [1] nulls on the right operand + "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id", + // [2] nulls on the left side + "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id", + // [3] one operand is empty + "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id" + }; + + @Test + public final void testFullOuterHashJoinExec0() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()), + Integer.MAX_VALUE); + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuterHashJoinExec0"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(12, count); + + } + + + @Test + public final void testFullOuterHashJoinExec1() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), + Integer.MAX_VALUE); + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuter_HashJoinExec1"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + + } + + @Test + public final void testFullOuterHashJoinExec2() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuterHashJoinExec2"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + + } + + + @Test + public final void testFullOuterHashJoinExec3() throws IOException, TajoException { + Expr expr = analyzer.parse(QUERIES[3]); + LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()), + Integer.MAX_VALUE); + FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFullOuterHashJoinExec3"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, + workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } +}
