TAJO-1407: Minor performance improvement of MemSortExec. Closes #426
Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d7b5212c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d7b5212c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d7b5212c Branch: refs/heads/index_support Commit: d7b5212ceb569ba145318d78cff46ca49bef973c Parents: a921585 Author: Jihoon Son <[email protected]> Authored: Tue Mar 24 17:09:20 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Tue Mar 24 17:09:20 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/datum/BooleanDatum.java | 9 +- .../org/apache/tajo/datum/DatumFactory.java | 4 + .../java/org/apache/tajo/datum/TextDatum.java | 4 +- .../engine/planner/physical/MemSortExec.java | 28 ++- .../tajo/engine/planner/physical/SortExec.java | 4 +- .../engine/planner/physical/TupleSorter.java | 27 +++ .../planner/physical/VectorizedSorter.java | 198 +++++++++++++++++++ .../planner/physical/TestTupleSorter.java | 132 +++++++++++++ 9 files changed, 389 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 811992c..58c061a 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1407: Minor performance improvement of MemSortExec. (Contributed by + navis, Committed by jihoon) + TAJO-1403: Improve 'Simple Query' with only partition columns and constant values. (Contributed by Dongjoon Hyun, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java index 93933a8..596540f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java @@ -18,6 +18,7 @@ package org.apache.tajo.datum; +import com.google.common.primitives.Booleans; import com.google.gson.annotations.Expose; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; @@ -167,13 +168,7 @@ public class BooleanDatum extends Datum { public int compareTo(Datum datum) { switch (datum.type()) { case BOOLEAN: - if (val && !datum.asBool()) { - return -1; - } else if (val && datum.asBool()) { - return 1; - } else { - return 0; - } + return Booleans.compare(val, datum.asBool()); default: throw new InvalidOperationException(datum.type()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java index 11ba791..9f48cad 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java @@ -317,6 +317,10 @@ public class DatumFactory { return new IntervalDatum(interval); } + public static IntervalDatum createInterval(int month, long interval) { + return new IntervalDatum(month, interval); + } + public static DateDatum createDate(Datum datum) { switch (datum.type()) { case INT4: http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java index ba45e71..ffd6ca2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java @@ -33,7 +33,6 @@ import java.util.Comparator; public class TextDatum extends Datum { public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); - @Expose private final int size; /* encoded in UTF-8 */ @Expose private final byte[] bytes; @@ -44,7 +43,6 @@ public class TextDatum extends Datum { public TextDatum(byte[] bytes) { super(TajoDataTypes.Type.TEXT); this.bytes = bytes; - this.size = bytes.length; } public TextDatum(String string) { @@ -108,7 +106,7 @@ public class TextDatum extends Datum { @Override public int size() { - return size; + return bytes.length; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index c77313e..a2e039c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -29,12 +29,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -public class MemSortExec extends SortExec { +public class MemSortExec extends SortExec implements TupleSorter { private SortNode plan; private List<Tuple> tupleSlots; private boolean sorted = false; private Iterator<Tuple> iterator; - + public MemSortExec(final TaskAttemptContext context, SortNode plan, PhysicalExec child) { super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys()); @@ -43,7 +43,7 @@ public class MemSortExec extends SortExec { public void init() throws IOException { super.init(); - this.tupleSlots = new ArrayList<Tuple>(1000); + this.tupleSlots = new ArrayList<Tuple>(10000); } @Override @@ -54,12 +54,10 @@ public class MemSortExec extends SortExec { while (!context.isStopped() && (tuple = child.next()) != null) { tupleSlots.add(new VTuple(tuple)); } - - Collections.sort(tupleSlots, getComparator()); - this.iterator = tupleSlots.iterator(); + iterator = getSorter().sort(); sorted = true; } - + if (iterator.hasNext()) { return this.iterator.next(); } else { @@ -67,6 +65,14 @@ public class MemSortExec extends SortExec { } } + private TupleSorter getSorter() { + try { + return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); + } catch (Exception e) { + return this; + } + } + @Override public void rescan() throws IOException { super.rescan(); @@ -86,4 +92,10 @@ public class MemSortExec extends SortExec { public SortNode getPlan() { return this.plan; } -} + + @Override + public Iterator<Tuple> sort() { + Collections.sort(tupleSlots, comparator); + return tupleSlots.iterator(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index c0703dd..fb6a3b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -29,8 +29,8 @@ import java.io.IOException; import java.util.Comparator; public abstract class SortExec extends UnaryPhysicalExec { - private final TupleComparator comparator; - private final SortSpec [] sortSpecs; + protected final BaseTupleComparator comparator; + protected final SortSpec [] sortSpecs; public SortExec(TaskAttemptContext context, Schema inSchema, Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java new file mode 100644 index 0000000..d240e4a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.storage.Tuple; + +import java.util.Iterator; + +public interface TupleSorter { + Iterator<Tuple> sort(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java new file mode 100644 index 0000000..891d104 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; +import org.apache.hadoop.util.IndexedSortable; +import org.apache.hadoop.util.QuickSort; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.Tuple; + +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; + +/** + * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting + * Uses indirection for efficient swapping + */ +public class VectorizedSorter implements IndexedSortable, TupleSorter { + + private final Tuple[] tuples; // source tuples + private final TupleVector[] vectors; // values of key columns + private final int[] mappings; // index indirection + + public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) { + this.tuples = source.toArray(new Tuple[source.size()]); + vectors = new TupleVector[sortKeys.length]; + mappings = new int[tuples.length]; + for (int i = 0; i < vectors.length; i++) { + TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType(); + boolean nullFirst = sortKeys[i].isNullFirst(); + boolean ascending = sortKeys[i].isAscending(); + boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending; + vectors[i] = new TupleVector(TupleVector.getType(type), tuples.length, nullInvert, ascending); + } + for (int i = 0; i < tuples.length; i++) { + for (int j = 0; j < keyIndex.length; j++) { + vectors[j].add(tuples[i].get(keyIndex[j])); + } + mappings[i] = i; + } + } + + @Override + public int compare(int i1, int i2) { + final int index1 = mappings[i1]; + final int index2 = mappings[i2]; + for (TupleVector vector : vectors) { + int compare = vector.compare(index1, index2); + if (compare != 0) { + return compare; + } + } + return 0; + } + + @Override + public void swap(int i1, int i2) { + int v1 = mappings[i1]; + mappings[i1] = mappings[i2]; + mappings[i2] = v1; + } + + @Override + public Iterator<Tuple> sort() { + new QuickSort().sort(VectorizedSorter.this, 0, mappings.length); + return new Iterator<Tuple>() { + int index; + public boolean hasNext() { return index < mappings.length; } + public Tuple next() { return tuples[mappings[index++]]; } + public void remove() { throw new UnsupportedException(); } + }; + } + + private static class TupleVector { + + private final int type; + private final BitSet nulls; + private final boolean nullInvert; + private final boolean ascending; + + private boolean[] booleans; + private byte[] bits; + private short[] shorts; + private int[] ints; + private long[] longs; + private float[] floats; + private double[] doubles; + private byte[][] bytes; + + private int index; + + private TupleVector(int type, int length, boolean nullInvert, boolean ascending) { + this.type = type; + this.nulls = new BitSet(length); + this.nullInvert = nullInvert; + this.ascending = ascending; + switch (type) { + case 0: booleans = new boolean[length]; break; + case 1: bits = new byte[length]; break; + case 2: shorts = new short[length]; break; + case 3: ints = new int[length]; break; + case 4: longs = new long[length]; break; + case 5: floats = new float[length]; break; + case 6: doubles = new double[length]; break; + case 7: bytes = new byte[length][]; break; + default: + throw new IllegalArgumentException(); + } + } + + private void add(Datum datum) { + if (datum.isNull()) { + nulls.set(index++); + return; + } + switch (type) { + case 0: booleans[index] = datum.asBool(); break; + case 1: bits[index] = datum.asByte(); break; + case 2: shorts[index] = datum.asInt2(); break; + case 3: ints[index] = datum.asInt4(); break; + case 4: longs[index] = datum.asInt8(); break; + case 5: floats[index] = datum.asFloat4(); break; + case 6: doubles[index] = datum.asFloat8(); break; + case 7: bytes[index] = datum.asByteArray(); break; + default: + throw new IllegalArgumentException(); + } + index++; + } + + private int compare(int index1, int index2) { + final boolean n1 = nulls.get(index1); + final boolean n2 = nulls.get(index2); + if (n1 && n2) { + return 0; + } + if (n1 ^ n2) { + int compVal = n1 ? 1 : -1; + return nullInvert ? -compVal : compVal; + } + int compare; + switch (type) { + case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break; + case 1: compare = bits[index1] - bits[index2]; break; + case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break; + case 3: compare = Ints.compare(ints[index1], ints[index2]); break; + case 4: compare = Longs.compare(longs[index1], longs[index2]); break; + case 5: compare = Floats.compare(floats[index1], floats[index2]); break; + case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break; + case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break; + default: + throw new IllegalArgumentException(); + } + return ascending ? compare : -compare; + } + + public static int getType(TajoDataTypes.Type type) { + switch (type) { + case BOOLEAN: return 0; + case BIT: case INT1: return 1; + case INT2: return 2; + case INT4: case DATE: case INET4: return 3; + case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4; + case FLOAT4: return 5; + case FLOAT8: return 6; + case TEXT: case CHAR: case BLOB: return 7; + } + // todo + throw new UnsupportedException(type.name()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java new file mode 100644 index 0000000..fc43d42 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertArrayEquals; + +public class TestTupleSorter { + + private static final Log LOG = LogFactory.getLog(TestTupleSorter.class); + + private static final Random rnd = new Random(-1); + + @Test + public final void testSortBench() { + final int MAX_SORT_KEY = 3; + final int ITERATION = 10; + final int LENGTH = 1000000; + final int SAMPLING = 100; + + Tuple[] tuples = new Tuple[LENGTH]; + for (int i = 0; i < LENGTH; i++) { + Datum[] datums = new Datum[]{ + DatumFactory.createInt4(rnd.nextInt(Short.MAX_VALUE)), + DatumFactory.createInt4(rnd.nextInt()), + DatumFactory.createText("dept_" + rnd.nextInt()), + DatumFactory.createBool(rnd.nextBoolean()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createInterval(rnd.nextInt(), rnd.nextLong())}; + tuples[i] = new VTuple(datums); + } + + Column col0 = new Column("col0", Type.INT2); + Column col1 = new Column("col1", Type.INT4); + Column col2 = new Column("col2", Type.TEXT); + Column col3 = new Column("col3", Type.BOOLEAN); + Column col4 = new Column("col4", Type.INT8); + Column col5 = new Column("col5", Type.INTERVAL); + + Schema schema = new Schema(new Column[] {col0, col1, col2, col3, col4, col5}); + + long[] time1 = new long[ITERATION]; + long[] time2 = new long[ITERATION]; + for(int iteration = 0; iteration < ITERATION; iteration++) { + List<Tuple> target = Arrays.asList(Arrays.copyOf(tuples, tuples.length)); + Set<Integer> keys = new TreeSet<Integer>(); + for (int i = 0; i < MAX_SORT_KEY; i++) { + keys.add(rnd.nextInt(schema.size())); + } + int[] keyIndices = Ints.toArray(keys); + SortSpec[] sortKeys = new SortSpec[keyIndices.length]; + for (int i = 0; i < keyIndices.length; i++) { + sortKeys[i] = new SortSpec(schema.getColumn(keyIndices[i]), rnd.nextBoolean(), rnd.nextBoolean()); + } + + long start = System.currentTimeMillis(); + VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices); + Iterator<Tuple> iterator = sorter.sort(); + + String[] result1 = new String[SAMPLING]; + for (int i = 0; i < result1.length; i++) { + Tuple tuple = Iterators.get(iterator, LENGTH / result1.length - 1); + StringBuilder builder = new StringBuilder(); + for (int keyIndex : keyIndices) { + builder.append(tuple.get(keyIndex).asChars()); + } + result1[i] = builder.toString(); + } + time1[iteration] = System.currentTimeMillis() - start; + + BaseTupleComparator comparator = new BaseTupleComparator(schema, sortKeys); + + start = System.currentTimeMillis(); + Collections.sort(target, comparator); + iterator = target.iterator(); + + String[] result2 = new String[SAMPLING]; + for (int i = 0; i < result2.length; i++) { + Tuple tuple = Iterators.get(iterator, LENGTH / result2.length - 1); + StringBuilder builder = new StringBuilder(); + for (int keyIndex : keyIndices) { + builder.append(tuple.get(keyIndex).asChars()); + } + result2[i] = builder.toString(); + } + time2[iteration] = System.currentTimeMillis() - start; + + LOG.info("Sort on keys " + Arrays.toString(keyIndices) + + ": Vectorized " + time1[iteration]+ " msec, Original " + time2[iteration] + " msec"); + + assertArrayEquals(result1, result2); + } + } +}
