TAJO-2109: Implement Radix sort. Closes #992
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9afd9abe Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9afd9abe Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9afd9abe Branch: refs/heads/master Commit: 9afd9abe379cbef8c6ae2e17c19e280ed3ec2a07 Parents: 45100ce Author: Jihoon Son <[email protected]> Authored: Mon Apr 18 10:45:10 2016 +0900 Committer: Jihoon Son <[email protected]> Committed: Mon Apr 18 10:45:10 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../main/java/org/apache/tajo/SessionVars.java | 3 + .../apache/tajo/common/type/TajoTypeUtil.java | 19 + .../java/org/apache/tajo/conf/TajoConf.java | 2 + .../main/java/org/apache/tajo/datum/Datum.java | 14 +- .../org/apache/tajo/datum/DatumFactory.java | 6 +- .../tajo/tuple/memory/UnSafeTupleList.java | 4 + .../org/apache/tajo/datum/TestBytesDatum.java | 4 +- .../apache/tajo/datum/TestTimestampDatum.java | 20 +- tajo-core-tests/pom.xml | 12 + .../tajo/engine/eval/TestSQLExpression.java | 2 +- .../engine/function/TestDateTimeFunctions.java | 2 +- .../planner/physical/TestExternalSortExec.java | 119 ++- .../engine/planner/physical/TestRadixSort.java | 260 ++++++ .../apache/tajo/engine/query/TestSortQuery.java | 28 +- .../apache/tajo/engine/util/BenchmarkSort.java | 239 +++++ .../queries/TestSortQuery/testSort.sql | 2 +- .../queries/TestSortQuery/testSortDesc.sql | 2 +- .../TestSortQuery/testSortWithAlias1.sql | 2 +- .../testSortWithAliasButOriginalName.sql | 2 +- .../queries/TestSortQuery/testSortWithExpr1.sql | 2 +- .../queries/TestSortQuery/testTopK.sql | 2 +- .../queries/TestSortQuery/testTopkWithJson.json | 8 + .../TestTajoCli/testHelpSessionVars.result | 1 + .../engine/function/datetime/NowTimestamp.java | 2 +- .../function/datetime/ToTimestampInt.java | 2 +- .../engine/planner/UniformRangePartition.java | 6 +- .../planner/physical/ExternalSortExec.java | 53 +- .../tajo/engine/planner/physical/RadixSort.java | 921 +++++++++++++++++++ .../NonForwardQueryResultSystemScanner.java | 4 +- .../java/org/apache/tajo/querymaster/Stage.java | 2 +- .../org/apache/tajo/plan/logical/SortNode.java | 6 +- tajo-project/pom.xml | 8 +- .../org/apache/tajo/storage/StorageUtil.java | 38 - .../org/apache/tajo/storage/TestStorages.java | 2 +- .../apache/tajo/storage/jdbc/JdbcScanner.java | 2 +- 36 files changed, 1686 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7565faf..ac1a8aa 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,8 @@ Release 0.12.0 - unreleased NEW FEATURES + TAJO-2109: Implement Radix sort. (jihoon) + TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik) IMPROVEMENT http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index ba85549..ab00a41 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -165,6 +165,8 @@ public enum SessionVars implements ConfigKey { COMPRESSED_RESULT_TRANSFER(ConfVars.$COMPRESSED_RESULT_TRANSFER, "Use compression to optimize result transmission.", CLI_SIDE_VAR, Boolean.class, Validators.bool()), + SORT_ALGORITHM(ConfVars.$SORT_ALGORITHM, "sort algorithm", DEFAULT), + //------------------------------------------------------------------------------- // Only for Unit Testing //------------------------------------------------------------------------------- @@ -174,6 +176,7 @@ public enum SessionVars implements ConfigKey { TEST_FILTER_PUSHDOWN_ENABLED(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED, "filter push down enabled", TEST_VAR), TEST_MIN_TASK_NUM(ConfVars.$TEST_MIN_TASK_NUM, "(test only) min task num", TEST_VAR), TEST_PLAN_SHAPE_FIX_ENABLED(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED, "(test only) plan shape fix enabled", TEST_VAR), + TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT(ConfVars.$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, "(test only) Tim sort threshold for radix sort", TEST_VAR) ; public static final Map<String, SessionVars> SESSION_VARS = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java index a70218d..ecaeeb1 100644 --- a/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/common/type/TajoTypeUtil.java @@ -175,9 +175,28 @@ public class TajoTypeUtil { case DATE: case TIME: case TIMESTAMP: + case INET4: case VARCHAR: + case CHAR: case TEXT: return false; default: return true; } } + + public static boolean isNumeric(Type type) { + return isNumber(type) || isReal(type); + } + + public static boolean isNumber(Type type) { + return + type == Type.INT2 || + type == Type.INT4 || + type == Type.INT8; + } + + public static boolean isReal(Type type) { + return + type == Type.FLOAT4|| + type == Type.FLOAT8; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index c36f43b..24a5520 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -367,6 +367,7 @@ public class TajoConf extends Configuration { $AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000), $SORT_LIST_SIZE("tajo.executor.sort.list.size", 100000), $JOIN_HASH_TABLE_SIZE("tajo.executor.join.hash-table.size", 100000), + $SORT_ALGORITHM("tajo.executor.sort.algorithm", "TIM"), // for index $INDEX_ENABLED("tajo.query.index.enabled", false), @@ -399,6 +400,7 @@ public class TajoConf extends Configuration { $TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true), $TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1), $TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false), // used for explain statement test + $TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-threshold", 65536), // Behavior Control --------------------------------------------------------- $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false), http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java index 6aa11ce..e2173a8 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java @@ -20,10 +20,11 @@ package org.apache.tajo.datum; import com.google.gson.annotations.Expose; import org.apache.tajo.SessionVars; +import org.apache.tajo.common.type.TajoTypeUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.exception.InvalidValueForCastException; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.exception.InvalidValueForCastException; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.json.CommonGsonHelper; import org.apache.tajo.json.GsonObject; @@ -120,20 +121,15 @@ public abstract class Datum implements Comparable<Datum>, GsonObject { } public boolean isNumeric() { - return isNumber() || isReal(); + return TajoTypeUtil.isNumeric(type); } public boolean isNumber() { - return - this.type == Type.INT2 || - this.type == Type.INT4 || - this.type == Type.INT8; + return TajoTypeUtil.isNumber(type); } public boolean isReal() { - return - this.type == Type.FLOAT4|| - this.type == Type.FLOAT8; + return TajoTypeUtil.isReal(type); } protected static void initAbortWhenDivideByZero(TajoConf tajoConf) { http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index dd4a4e4..e9ac0c5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -289,12 +289,12 @@ public class DatumFactory { return new TimeDatum(DateTimeUtil.toTime(tm)); } - public static TimestampDatum createTimestmpDatumWithJavaMillis(long millis) { + public static TimestampDatum createTimestampDatumWithJavaMillis(long millis) { return new TimestampDatum(DateTimeUtil.javaTimeToJulianTime(millis)); } - public static TimestampDatum createTimestmpDatumWithUnixTime(int unixTime) { - return createTimestmpDatumWithJavaMillis(unixTime * 1000L); + public static TimestampDatum createTimestampDatumWithUnixTime(int unixTime) { + return createTimestampDatumWithJavaMillis(unixTime * 1000L); } public static TimestampDatum createTimestamp(String datetimeStr) { http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java index 4c4a6cb..7bad396 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java @@ -54,6 +54,10 @@ public class UnSafeTupleList extends ArrayList<UnSafeTuple> { } + public DataType[] getDataTypes() { + return dataTypes; + } + @Override public boolean add(UnSafeTuple tuple) { return addTuple(tuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java index 4dcbbee..c3a0e84 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestBytesDatum.java @@ -25,9 +25,7 @@ import org.junit.Test; import java.nio.ByteBuffer; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestBytesDatum { http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java index f82f66d..68b34a6 100644 --- a/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java +++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestTimestampDatum.java @@ -47,33 +47,33 @@ public class TestTimestampDatum { @Test public final void testType() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); assertEquals(Type.TIMESTAMP, d.type()); } @Test(expected = TajoRuntimeException.class) public final void testAsInt4() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); d.asInt4(); } @Test public final void testAsInt8() { - Datum d = DatumFactory.createTimestmpDatumWithJavaMillis(unixtime * 1000); + Datum d = DatumFactory.createTimestampDatumWithJavaMillis(unixtime * 1000); long javaTime = unixtime * 1000; assertEquals(DateTimeUtil.javaTimeToJulianTime(javaTime), d.asInt8()); } @Test(expected = TajoRuntimeException.class) public final void testAsFloat4() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); d.asFloat4(); } @Test(expected = TajoRuntimeException.class) public final void testAsFloat8() { int instance = 1386577582; - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(instance); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(instance); d.asFloat8(); } @@ -97,7 +97,7 @@ public class TestTimestampDatum { @Test public final void testSize() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); assertEquals(TimestampDatum.SIZE, d.asByteArray().length); } @@ -112,7 +112,7 @@ public class TestTimestampDatum { @Test public final void testToJson() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); Datum copy = CommonGsonHelper.fromJson(d.toJson(), Datum.class); assertEquals(d, copy); } @@ -168,12 +168,12 @@ public class TestTimestampDatum { assertEquals(uTime, DateTimeUtil.julianTimeToEpoch(julianTimestamp)); assertEquals(jTime, DateTimeUtil.julianTimeToJavaTime(julianTimestamp)); - TimestampDatum datum3 = DatumFactory.createTimestmpDatumWithJavaMillis(jTime); + TimestampDatum datum3 = DatumFactory.createTimestampDatumWithJavaMillis(jTime); assertEquals(cal.get(Calendar.YEAR), datum3.getYear()); assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear()); assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth()); - datum3 = DatumFactory.createTimestmpDatumWithUnixTime(uTime); + datum3 = DatumFactory.createTimestampDatumWithUnixTime(uTime); assertEquals(cal.get(Calendar.YEAR), datum3.getYear()); assertEquals(cal.get(Calendar.MONTH) + 1, datum3.getMonthOfYear()); assertEquals(cal.get(Calendar.DAY_OF_MONTH), datum3.getDayOfMonth()); @@ -182,7 +182,7 @@ public class TestTimestampDatum { @Test public final void testNull() { - Datum d = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + Datum d = DatumFactory.createTimestampDatumWithUnixTime(unixtime); assertEquals(Boolean.FALSE,d.equals(DatumFactory.createNullDatum())); assertEquals(DatumFactory.createNullDatum(),d.equalsTo(DatumFactory.createNullDatum())); assertEquals(-1,d.compareTo(DatumFactory.createNullDatum())); http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml index 6de5546..b12642a 100644 --- a/tajo-core-tests/pom.xml +++ b/tajo-core-tests/pom.xml @@ -350,6 +350,18 @@ <artifactId>powermock-api-mockito</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.11.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.11.3</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java index 2db826b..fa4561a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/TestSQLExpression.java @@ -860,7 +860,7 @@ public class TestSQLExpression extends ExprTestBase { TimeZone tz = TimeZone.getTimeZone("GMT-6"); int unixtime = 1389071574; // (int) (System.currentTimeMillis() / 1000); - TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime(unixtime); + TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime(unixtime); testSimpleEval(context, String.format("select to_timestamp(CAST(split_part('%d.999', '.', 1) as INT8));", unixtime), new String[] {TimestampDatum.asChars(expected.asTimeMeta(), tz, false)}); } http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java index dc9bd25..36a4a60 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestDateTimeFunctions.java @@ -41,7 +41,7 @@ public class TestDateTimeFunctions extends ExprTestBase { @Test public void testToTimestamp() throws TajoException { long expectedTimestamp = System.currentTimeMillis(); - TimestampDatum expected = DatumFactory.createTimestmpDatumWithUnixTime((int)(expectedTimestamp/ 1000)); + TimestampDatum expected = DatumFactory.createTimestampDatumWithUnixTime((int)(expectedTimestamp/ 1000)); // (expectedTimestamp / 1000) means the translation from millis seconds to unix timestamp String q1 = String.format("select to_timestamp(%d);", (expectedTimestamp / 1000)); http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index 580fe86..788ebeb 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -30,12 +30,15 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.datum.NullDatum; import org.apache.tajo.engine.planner.PhysicalPlanner; import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.physical.ExternalSortExec.SortAlgorithm; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.logical.LogicalNode; @@ -46,14 +49,20 @@ import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Random; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class TestExternalSortExec { private TajoConf conf; private TajoTestingCluster util; @@ -61,12 +70,27 @@ public class TestExternalSortExec { private CatalogService catalog; private SQLAnalyzer analyzer; private LogicalPlanner planner; + private LogicalOptimizer optimizer; private Path testDir; + private Schema tableSchema; private final int numTuple = 1000; private Random rnd = new Random(System.currentTimeMillis()); private TableDesc employee; + private String sortAlgorithmString; + + public TestExternalSortExec(String sortAlgorithm) { + this.sortAlgorithmString = sortAlgorithm; + } + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {SortAlgorithm.TIM.name()}, + {SortAlgorithm.MSD_RADIX.name()}, + }); + } @Before public void setUp() throws Exception { @@ -79,33 +103,81 @@ public class TestExternalSortExec { catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); - Schema schema = SchemaFactory.newV1(); - schema.addColumn("managerid", Type.INT4); - schema.addColumn("empid", Type.INT4); - schema.addColumn("deptname", Type.TEXT); + tableSchema = SchemaFactory.newV1(new Column[] { + new Column("managerid", Type.INT8), + new Column("empid", Type.INT4), + new Column("deptname", Type.TEXT), + new Column("col1", Type.INT8), + new Column("col2", Type.INT8), + new Column("col3", Type.INT8), + new Column("col4", Type.INT8), + new Column("col5", Type.INT8), + new Column("col6", Type.INT8), + new Column("col7", Type.INT8), + new Column("col8", Type.INT8), + new Column("col9", Type.INT8), + new Column("col10", Type.INT8), + new Column("col11", Type.INT8), + new Column("col12", Type.INT8) + }); TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); Path employeePath = new Path(testDir, "employee.csv"); Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) - .getAppender(employeeMeta, schema, employeePath); + .getAppender(employeeMeta, tableSchema, employeePath); appender.enableStats(); appender.init(); - VTuple tuple = new VTuple(schema.size()); + VTuple tuple = new VTuple(tableSchema.size()); for (int i = 0; i < numTuple; i++) { - tuple.put(new Datum[] { - DatumFactory.createInt4(rnd.nextInt(50)), - DatumFactory.createInt4(rnd.nextInt(100)), - DatumFactory.createText("dept_" + i), - }); + if (rnd.nextInt(1000) == 0) { + tuple.put(new Datum[] { + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + }); + } else { + boolean positive = rnd.nextInt(2) == 0; + tuple.put(new Datum[]{ + DatumFactory.createInt8(positive ? 100_000 + rnd.nextInt(100_000) : (100_000 + rnd.nextInt(100_000)) * -1), + DatumFactory.createInt4(rnd.nextInt(100)), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + DatumFactory.createInt8(100_000 + rnd.nextInt(50)), + }); + } appender.addTuple(tuple); } + appender.flush(); appender.close(); - employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); + employee = new TableDesc("default.employee", tableSchema, employeeMeta, employeePath.toUri()); catalog.createTable(employee); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance()); } @After @@ -122,7 +194,8 @@ public class TestExternalSortExec { public final void testNext() throws IOException, TajoException { conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, 2); QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); - queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 1); + queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithmString); + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 4); FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getUri()), Integer.MAX_VALUE); @@ -132,28 +205,32 @@ public class TestExternalSortExec { ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[0]); LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = plan.getRootBlock().getRoot(); + LogicalNode rootNode = optimizer.optimize(plan); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - ProjectionExec proj = (ProjectionExec) exec; Tuple tuple; Tuple preVal = null; Tuple curVal; int cnt = 0; exec.init(); - long start = System.currentTimeMillis(); - BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), + Schema sortSchema = SchemaFactory.newV1(new Column[] { + new Column("managerid", Type.INT8), + new Column("empid", Type.INT4), + }); + + BaseTupleComparator comparator = new BaseTupleComparator(sortSchema, new SortSpec[]{ - new SortSpec(new Column("managerid", Type.INT4)), - new SortSpec(new Column("empid", Type.INT4)) + new SortSpec(new Column("managerid", Type.INT8)), + new SortSpec(new Column("empid", Type.INT4)), }); + long start = System.currentTimeMillis(); while ((tuple = exec.next()) != null) { curVal = tuple; if (preVal != null) { - assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + assertTrue("prev: " + preVal + ", but cur: " + curVal + ", cnt: " + cnt, comparator.compare(preVal, curVal) <= 0); } preVal = new VTuple(curVal); cnt++; http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java new file mode 100644 index 0000000..8246834 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestRadixSort.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.SessionVars; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaFactory; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.physical.ExternalSortExec.UnSafeComparator; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.UnSafeTuple; +import org.apache.tajo.tuple.memory.UnSafeTupleList; +import org.apache.tajo.util.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestRadixSort { + private final static QueryContext queryContext; + private static UnSafeTupleList tuples; + private static Schema schema; + private static final int tupleNum = 1000; + private static final Random random = new Random(System.currentTimeMillis()); + private SortSpec[] sortSpecs; + private final static Datum MINUS_ONE = DatumFactory.createInt4(-1); + + static { + queryContext = new QueryContext(new TajoConf()); + queryContext.setInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, 0); + + schema = SchemaFactory.newV1(new Column[]{ + new Column("col0", Type.INT8), + new Column("col1", Type.INT4), + new Column("col2", Type.INT2), + new Column("col3", Type.DATE), + new Column("col4", Type.TIMESTAMP), + new Column("col5", Type.TIME), + new Column("col6", Type.INET4), + new Column("col7", Type.FLOAT4), + new Column("col8", Type.FLOAT8) + }); + } + + private static class Param { + final SortSpec[] sortSpecs; + + public Param(SortSpec[] param) { + this.sortSpecs = param; + } + + @Override + public String toString() { + return StringUtils.join(sortSpecs); + } + } + + public TestRadixSort(Param param) { + this.sortSpecs = param.sortSpecs; + } + + @Parameters(name = "{index}: {0}") + public static Collection<Object[]> generateParameters() { + List<Object[]> params = new ArrayList<>(); + + // Test every single column sort + for (int i = 0; i < schema.size(); i++) { + params.add(new Object[] { + new Param( + new SortSpec[] { + new SortSpec(schema.getColumn(i), random.nextBoolean(), random.nextBoolean()) + }) + }); + } + + // Randomly choose columns + for (int colNum = 2; colNum < 6; colNum++) { + for (int i =0; i < 5; i++) { + SortSpec[] sortSpecs = new SortSpec[colNum]; + for (int j = 0; j <colNum; j++) { + sortSpecs[j] = new SortSpec(schema.getColumn(random.nextInt(schema.size())), + random.nextBoolean(), random.nextBoolean()); + } + params.add(new Object[] {new Param(sortSpecs)}); + } + } + + return params; + } + + @Before + public void setup() { + List<DataType> dataTypeList = schema.getRootColumns().stream().map(c -> c.getDataType()).collect(Collectors.toList()); + tuples = new UnSafeTupleList(dataTypeList.toArray(new DataType[dataTypeList.size()]), tupleNum); + + // add null and negative numbers + VTuple tuple = new VTuple(schema.size()); + IntStream.range(0, tupleNum - 6).forEach(i -> { + // Each of null tuples, max tuples, and min tuples occupies 10 % of the total tuples. + int r = random.nextInt(10); + switch (r) { + case 0: + makeNullTuple(tuple); + break; + case 1: + makeMaxTuple(tuple); + break; + case 2: + makeMinTuple(tuple); + break; + default: + makeRandomTuple(tuple); + break; + } + + tuples.addTuple(tuple); + }); + + // Add at least 2 null, max, min tuples. + makeMaxTuple(tuple); + tuples.addTuple(tuple); + makeMinTuple(tuple); + tuples.addTuple(tuple); + makeNullTuple(tuple); + tuples.addTuple(tuple); + makeMaxTuple(tuple); + tuples.addTuple(tuple); + makeMinTuple(tuple); + tuples.addTuple(tuple); + makeNullTuple(tuple); + tuples.addTuple(tuple); + } + + @After + public void teardown() { + tuples.release(); + } + + private static Tuple makeNullTuple(Tuple tuple) { + tuple.put(new Datum[] { + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get() + }); + return tuple; + } + + private static Tuple makeRandomTuple(Tuple tuple) { + tuple.put(new Datum[]{ + DatumFactory.createInt8(random.nextLong()), + DatumFactory.createInt4(random.nextInt()), + DatumFactory.createInt2((short) random.nextInt(Short.MAX_VALUE)), + DatumFactory.createDate(Math.abs(random.nextInt())), + DatumFactory.createTimestamp(Math.abs(random.nextLong())), + DatumFactory.createTime(Math.abs(random.nextLong())), + DatumFactory.createInet4(random.nextInt()), + DatumFactory.createFloat4(random.nextFloat()), + DatumFactory.createFloat8(random.nextDouble()) + }); + + for (int i = 0; i < 3; i++) { + if (random.nextBoolean()) { + tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE)); + } + } + + for (int i = 7; i < 9; i++) { + if (random.nextBoolean()) { + tuple.put(i, tuple.asDatum(i).multiply(MINUS_ONE)); + } + } + + return tuple; + } + + private static Tuple makeMaxTuple(Tuple tuple) { + tuple.put(new Datum[]{ + DatumFactory.createInt8(Long.MAX_VALUE), + DatumFactory.createInt4(Integer.MAX_VALUE), + DatumFactory.createInt2(Short.MAX_VALUE), + DatumFactory.createDate(Integer.MAX_VALUE), + DatumFactory.createTimestamp(Long.MAX_VALUE), + DatumFactory.createTime(Long.MAX_VALUE), + DatumFactory.createInet4(Integer.MAX_VALUE), + DatumFactory.createFloat4(Float.MAX_VALUE), + DatumFactory.createFloat8(Double.MAX_VALUE) + }); + + return tuple; + } + + private static Tuple makeMinTuple(Tuple tuple) { + tuple.put(new Datum[]{ + DatumFactory.createInt8(Long.MIN_VALUE), + DatumFactory.createInt4(Integer.MIN_VALUE), + DatumFactory.createInt2(Short.MIN_VALUE), + DatumFactory.createDate(0), + DatumFactory.createTimestamp(0), + DatumFactory.createTime(0), + DatumFactory.createInet4(Integer.MIN_VALUE), + DatumFactory.createFloat4(Float.MIN_VALUE), + DatumFactory.createFloat8(Double.MIN_VALUE) + }); + + return tuple; + } + + @Test + public void testSort() { + Comparator<UnSafeTuple> comparator = new UnSafeComparator(schema, sortSpecs); + + RadixSort.sort(queryContext, tuples, schema, sortSpecs, comparator); + + IntStream.range(0, tuples.size() - 1) + .forEach(i -> { + assertTrue(tuples.get(i) + " precedes " + tuples.get(i + 1) + " at " + i, + comparator.compare(tuples.get(i), tuples.get(i + 1)) <= 0); + }); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java index 582d0b0..ef3336d 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java @@ -25,26 +25,47 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @Category(IntegrationTest.class) +@RunWith(Parameterized.class) public class TestSortQuery extends QueryTestCaseBase { - public TestSortQuery() { + public TestSortQuery(String sortAlgorithm) { super(TajoConstants.DEFAULT_DATABASE_NAME); Map<String, String> variables = new HashMap<>(); variables.put(SessionVars.SORT_LIST_SIZE.keyname(), "100"); + variables.put(SessionVars.SORT_ALGORITHM.keyname(), sortAlgorithm); client.updateSessionVariables(variables); } + @AfterClass + public static void tearDown() throws Exception { + client.unsetSessionVariables(Arrays.asList(SessionVars.SORT_ALGORITHM.keyname())); + } + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {"TIM"}, + {"MSD_RADIX"}, + }); + } + @Test public final void testSort() throws Exception { ResultSet res = executeQuery(); @@ -170,6 +191,8 @@ public class TestSortQuery extends QueryTestCaseBase { ResultSet res = executeQuery(); assertResultSet(res); cleanupQuery(res); + + executeString("drop table testSortWithDate"); } } @@ -188,6 +211,8 @@ public class TestSortQuery extends QueryTestCaseBase { ResultSet res = executeQuery(); assertResultSet(res); cleanupQuery(res); + + executeString("drop table table2"); } @Test @@ -446,6 +471,7 @@ public class TestSortQuery extends QueryTestCaseBase { cleanupQuery(res); } finally { testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0"); + executeString("drop table testOutOfScope"); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java new file mode 100644 index 0000000..1cc526f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/util/BenchmarkSort.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.util; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.planner.physical.TestExternalSortExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.parser.sql.SQLAnalyzer; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileTablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.util.Random; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; + +@State(Scope.Benchmark) +public class BenchmarkSort { + private TajoConf conf; + private TajoTestingCluster util; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/BenchmarkSort"; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private Path testDir; + + private final int numTuple = 10000; + private Random rnd = new Random(System.currentTimeMillis()); + + private TableDesc employee; + + String[] QUERIES = { + "select col0 from employee order by col0" + }; + + @State(Scope.Thread) + public static class BenchContext { + int sortBufferSize; + } + + @Setup + public void setup() throws Exception { + this.conf = new TajoConf(); + util = new TajoTestingCluster(); + util.startCatalogCluster(); + catalog = util.getCatalogService(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); + + Schema schema = SchemaFactory.newV1(new Column[] { + new Column("col0", Type.INT8), + new Column("col1", Type.INT4), + new Column("col2", Type.INT2), + new Column("col3", Type.DATE), + new Column("col4", Type.TIMESTAMP), + new Column("col5", Type.TIME), + new Column("col6", Type.INET4), + new Column("col7", Type.FLOAT4), + new Column("col8", Type.FLOAT8), + new Column("col9", Type.INT8), + new Column("col10", Type.INT8), + new Column("col11", Type.INT8), + new Column("col12", Type.INT8), + new Column("col13", Type.INT8), + new Column("col14", Type.INT8), + }); + + TableMeta employeeMeta = CatalogUtil.newTableMeta("TEXT"); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.enableStats(); + appender.init(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < numTuple; i++) { + if (rnd.nextInt(10000) == 0) { + tuple.put(new Datum[] { + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get(), + NullDatum.get() + }); + } else { + tuple.put(new Datum[]{ + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt4(rnd.nextInt()), + DatumFactory.createInt2((short) rnd.nextInt(Short.MAX_VALUE)), + DatumFactory.createDate(Math.abs(rnd.nextInt())), + DatumFactory.createTimestamp(Math.abs(rnd.nextLong())), + DatumFactory.createTime(Math.abs(rnd.nextLong())), + DatumFactory.createInet4(rnd.nextInt()), + DatumFactory.createFloat4(rnd.nextFloat()), + DatumFactory.createFloat8(rnd.nextDouble()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInt8(rnd.nextLong()) + }); + } + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + + employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); + catalog.createTable(employee); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance()); + } + + @TearDown + public void tearDown() throws IOException { + CommonTestingUtil.cleanupTestDir(TEST_PATH); + util.shutdownCatalogCluster(); + } + + @Benchmark + @BenchmarkMode(Mode.All) + public void timSort(BenchContext context) throws InterruptedException, IOException, TajoException { + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200); + queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "TIM"); + + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, + LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + while (exec.next() != null) {} + exec.close(); + } + + @Benchmark + @BenchmarkMode(Mode.All) + public void msdRadixSort(BenchContext context) throws InterruptedException, IOException, TajoException { + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, 200); + queryContext.set(SessionVars.SORT_ALGORITHM.keyname(), "MSD_RADIX"); + + FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(), + new Path(employee.getUri()), Integer.MAX_VALUE); + Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); + TaskAttemptContext ctx = new TaskAttemptContext(queryContext, + LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + LogicalNode rootNode = optimizer.optimize(plan); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + exec.init(); + while (exec.next() != null) {} + exec.close(); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(BenchmarkSort.class.getSimpleName()) + .warmupIterations(1) + .measurementIterations(1) + .forks(1) + .build(); + + new Runner(opt).run(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql index 7958002..ac79024 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSort.sql @@ -1 +1 @@ -select l_linenumber, l_orderkey from lineitem order by l_orderkey; \ No newline at end of file +select l_linenumber, l_orderkey from lineitem order by l_orderkey, l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql index 4252643..6636bed 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortDesc.sql @@ -1 +1 @@ -select l_linenumber, l_orderkey from lineitem order by l_orderkey desc; \ No newline at end of file +select l_linenumber, l_orderkey from lineitem order by l_orderkey desc, l_linenumber asc; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql index cd8be3e..fd88b7f 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAlias1.sql @@ -1 +1 @@ -select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey; \ No newline at end of file +select l_linenumber, l_orderkey as sortkey from lineitem order by sortkey, l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql index 1d6396a..2be75a8 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithAliasButOriginalName.sql @@ -1 +1 @@ -select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey; \ No newline at end of file +select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey, l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql index 2aeba26..ee3edda 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testSortWithExpr1.sql @@ -1 +1 @@ -select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1; \ No newline at end of file +select l_linenumber, l_orderkey as sortkey from lineitem order by l_orderkey + 1, l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql index 331f3b4..65519f0 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopK.sql @@ -1 +1 @@ -select l_orderkey, l_linenumber from lineitem order by l_orderkey desc limit 3; \ No newline at end of file +select l_orderkey, l_linenumber from lineitem order by l_orderkey desc, l_linenumber asc limit 3; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json index e3a264f..333037b 100644 --- a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testTopkWithJson.json @@ -32,6 +32,14 @@ }, "IsAsc": false, "IsNullFirst": false + }, + { + "SortKey": { + "ColumnName": "l_linenumber", + "OpType": "Column" + }, + "IsAsc": true, + "IsNullFirst": false } ], "Expr": { http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 46e6b76..5a6198e 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -48,4 +48,5 @@ Available Session Variables: \set FETCH_ROWNUM [int value] - The number of rows to be fetched from Master at a time \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution \set COMPRESSED_RESULT_TRANSFER [true or false] - Use compression to optimize result transmission. +\set SORT_ALGORITHM [text value] - sort algorithm \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java index dd0d195..2f298a7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/NowTimestamp.java @@ -44,7 +44,7 @@ public class NowTimestamp extends GeneralFunction { @Override public Datum eval(Tuple params) { if (datum == null) { - datum = DatumFactory.createTimestmpDatumWithJavaMillis(System.currentTimeMillis()); + datum = DatumFactory.createTimestampDatumWithJavaMillis(System.currentTimeMillis()); } return datum; } http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java index 5468b19..63b725c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java @@ -49,6 +49,6 @@ public class ToTimestampInt extends GeneralFunction { if (params.isBlankOrNull(0)) { return NullDatum.get(); } - return DatumFactory.createTimestmpDatumWithUnixTime(params.getInt4(0)); + return DatumFactory.createTimestampDatumWithUnixTime(params.getInt4(0)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java index 02e397d..0b8199f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java @@ -657,13 +657,13 @@ public class UniformRangePartition extends RangePartitionAlgorithm { break; case TIMESTAMP: if (overflowFlag[i]) { - end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis( + end.put(i, DatumFactory.createTimestampDatumWithJavaMillis( mergedRange.getStart().getInt8(i) + incs[i].longValue())); } else { if (sortSpecs[i].isAscending()) { - end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue())); + end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) + incs[i].longValue())); } else { - end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue())); + end.put(i, DatumFactory.createTimestampDatumWithJavaMillis(last.getInt8(i) - incs[i].longValue())); } } break; http://git-wip-us.apache.org/repos/asf/tajo/blob/9afd9abe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index ff629c3..e269bf6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -36,11 +36,13 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.engine.planner.PhysicalPlanningException; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.Scanner; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.storage.rawfile.DirectRawFileWriter; @@ -53,10 +55,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,6 +73,12 @@ import java.util.concurrent.Future; * </ul> */ public class ExternalSortExec extends SortExec { + + enum SortAlgorithm{ + TIM, + MSD_RADIX, + } + /** Class logger */ private static final Log LOG = LogFactory.getLog(ExternalSortExec.class); /** The prefix of fragment name for intermediate */ @@ -117,6 +122,8 @@ public class ExternalSortExec extends SortExec { /** total bytes of input data */ private long inputBytes; + private final SortAlgorithm sortAlgorithm; + private ExternalSortExec(final TaskAttemptContext context, final SortNode plan) throws PhysicalPlanningException { super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys()); @@ -133,6 +140,28 @@ public class ExternalSortExec extends SortExec { this.localFS = new RawLocalFileSystem(); this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW); this.inputStats = new TableStats(); + this.sortAlgorithm = getSortAlgorithm(context.getQueryContext(), sortSpecs); + LOG.info(sortAlgorithm.name() + " sort is selected"); + } + + private static SortAlgorithm getSortAlgorithm(QueryContext context, SortSpec[] sortSpecs) { + String sortAlgorithm = context.get(SessionVars.SORT_ALGORITHM, SortAlgorithm.TIM.name()); + if (Arrays.stream(sortSpecs) + .filter(sortSpec -> !RadixSort.isApplicableType(sortSpec)).count() > 0) { + if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) { + LOG.warn("Non-applicable types exist. Falling back to " + SortAlgorithm.TIM.name() + " sort"); + } + return SortAlgorithm.TIM; + } + if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.TIM.name())) { + return SortAlgorithm.TIM; + } else if (sortAlgorithm.equalsIgnoreCase(SortAlgorithm.MSD_RADIX.name())) { + return SortAlgorithm.MSD_RADIX; + } else { + LOG.warn("Unknown sort type: " + sortAlgorithm); + LOG.warn("Falling back to " + SortAlgorithm.TIM.name() + " sort"); + return SortAlgorithm.TIM; + } } public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode, @@ -172,6 +201,18 @@ public class ExternalSortExec extends SortExec { return this.plan; } + private List<UnSafeTuple> sort(UnSafeTupleList tupleBlock) { + switch (sortAlgorithm) { + case TIM: + return OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator); + case MSD_RADIX: + return RadixSort.sort(context.getQueryContext(), tupleBlock, inSchema, sortSpecs, unSafeComparator); + default: + // The below line is not reachable. So, an exception should be thrown if it is executed. + throw new TajoRuntimeException(new UnsupportedException(sortAlgorithm.name())); + } + } + /** * Sort a tuple block and store them into a chunk file */ @@ -180,7 +221,7 @@ public class ExternalSortExec extends SortExec { int rowNum = tupleBlock.size(); long sortStart = System.currentTimeMillis(); - OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator); + this.sort(tupleBlock); long sortEnd = System.currentTimeMillis(); long chunkWriteStart = System.currentTimeMillis(); @@ -527,7 +568,7 @@ public class ExternalSortExec extends SortExec { if (chunk.isMemory()) { long sortStart = System.currentTimeMillis(); - OffHeapRowBlockUtils.sort(inMemoryTable, unSafeComparator); + this.sort(inMemoryTable); Scanner scanner = new MemTableScanner<>(inMemoryTable, inMemoryTable.size(), inMemoryTable.usedMem()); if(LOG.isDebugEnabled()) { debug(LOG, "Memory Chunk sort (" + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)
