Repository: tajo Updated Branches: refs/heads/master eeaf379a4 -> 6b16264c1
TAJO-966: Range partition should support split of multiple characters. Closes #91 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6b16264c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6b16264c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6b16264c Branch: refs/heads/master Commit: 6b16264c1860f7e8156466fd806ba9251147a702 Parents: eeaf379 Author: Hyunsik Choi <[email protected]> Authored: Mon Aug 4 14:52:11 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Aug 4 14:52:26 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/util/BytesUtils.java | 35 +++ .../exception/RangeOverflowException.java | 5 +- .../engine/planner/RangePartitionAlgorithm.java | 97 +++---- .../engine/planner/UniformRangePartition.java | 259 ++++++++++++------- .../tajo/master/querymaster/Repartitioner.java | 10 +- .../planner/TestUniformRangePartition.java | 159 ++++++++++-- 7 files changed, 408 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index d6ac279..f98da28 100644 --- a/CHANGES +++ b/CHANGES @@ -29,6 +29,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-966: Range partition should support split of multiple characters. + (hyunsik) + TAJO-987: Hash shuffle should be balanced according to intermediate volumes. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java index 5f309c2..59ed4fb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java @@ -179,4 +179,39 @@ public class BytesUtils { } return (byte[][]) list.toArray(new byte[list.size()][]); } + + /** + * It gets the maximum length among all given the array of bytes. + * Then, it adds padding (i.e., \0) to byte arrays which are shorter + * than the maximum length. + * + * @param bytes Byte arrays to be padded + * @return The array of padded bytes + */ + public static byte[][] padBytes(byte []...bytes) { + byte [][] padded = new byte[bytes.length][]; + + int maxLen = Integer.MIN_VALUE; + + for (int i = 0; i < bytes.length; i++) { + maxLen = Math.max(maxLen, bytes[i].length); + } + + for (int i = 0; i < bytes.length; i++) { + int padLen = maxLen - bytes[i].length; + if (padLen == 0) { + padded[i] = bytes[i]; + } else if (padLen > 0) { + padded[i] = Bytes.padTail(bytes[i], padLen); + } else { + throw new RuntimeException("maximum length: " + maxLen + ", bytes[" + i + "].length:" + bytes[i].length); + } + } + + return padded; + } + + public static byte [] trimBytes(byte [] bytes) { + return new String(bytes).trim().getBytes(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java b/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java index 409d6ed..013c9c2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java @@ -22,7 +22,8 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; public class RangeOverflowException extends RuntimeException { - public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) { - super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range); + public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc, boolean ascending) { + super("Overflow Error: tried to " + (ascending ? "increase " : "decrease ") + inc + " to " + overflowValue + + ", but the range " + range); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java index 0aa6f97..db53cd7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java @@ -18,20 +18,19 @@ package org.apache.tajo.engine.planner; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.util.Bytes; -import java.math.BigDecimal; +import java.math.BigInteger; public abstract class RangePartitionAlgorithm { protected SortSpec [] sortSpecs; - protected TupleRange range; - protected final BigDecimal totalCard; + protected TupleRange mergedRange; + protected final BigInteger totalCard; /** true if the end of the range is inclusive. Otherwise, it should be false. */ protected final boolean inclusive; @@ -43,7 +42,7 @@ public abstract class RangePartitionAlgorithm { */ public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) { this.sortSpecs = sortSpecs; - this.range = totalRange; + this.mergedRange = totalRange; this.inclusive = inclusive; this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive); } @@ -56,117 +55,119 @@ public abstract class RangePartitionAlgorithm { * @param end * @return */ - public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end, + public static BigInteger computeCardinality(DataType dataType, Datum start, Datum end, boolean inclusive, boolean isAscending) { - BigDecimal columnCard; + BigInteger columnCard; switch (dataType.getType()) { case BOOLEAN: - columnCard = new BigDecimal(2); + columnCard = BigInteger.valueOf(2); break; case CHAR: if (isAscending) { - columnCard = new BigDecimal(end.asChar() - start.asChar()); + columnCard = BigInteger.valueOf((int)end.asChar() - (int)start.asChar()); } else { - columnCard = new BigDecimal(start.asChar() - end.asChar()); + columnCard = BigInteger.valueOf(start.asChar() - end.asChar()); } break; case BIT: if (isAscending) { - columnCard = new BigDecimal(end.asByte() - start.asByte()); + columnCard = BigInteger.valueOf(end.asByte() - start.asByte()); } else { - columnCard = new BigDecimal(start.asByte() - end.asByte()); + columnCard = BigInteger.valueOf(start.asByte() - end.asByte()); } break; case INT2: if (isAscending) { - columnCard = new BigDecimal(end.asInt2() - start.asInt2()); + columnCard = BigInteger.valueOf(end.asInt2() - start.asInt2()); } else { - columnCard = new BigDecimal(start.asInt2() - end.asInt2()); + columnCard = BigInteger.valueOf(start.asInt2() - end.asInt2()); } break; case INT4: if (isAscending) { - columnCard = new BigDecimal(end.asInt4() - start.asInt4()); + columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4()); } else { - columnCard = new BigDecimal(start.asInt4() - end.asInt4()); + columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4()); } break; - case INT8: + case INT8: + case TIME: + case TIMESTAMP: if (isAscending) { - columnCard = new BigDecimal(end.asInt8() - start.asInt8()); + columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8()); } else { - columnCard = new BigDecimal(start.asInt8() - end.asInt8()); + columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8()); } break; case FLOAT4: if (isAscending) { - columnCard = new BigDecimal(end.asInt4() - start.asInt4()); + columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4()); } else { - columnCard = new BigDecimal(start.asInt4() - end.asInt4()); + columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4()); } break; case FLOAT8: if (isAscending) { - columnCard = new BigDecimal(end.asInt8() - start.asInt8()); + columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8()); } else { - columnCard = new BigDecimal(start.asInt8() - end.asInt8()); + columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8()); } break; - case TEXT: - final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0); - final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0); + case TEXT: { + byte [] a; + byte [] b; if (isAscending) { - columnCard = new BigDecimal(textEnd - textStart); + a = start.asByteArray(); + b = end.asByteArray(); } else { - columnCard = new BigDecimal(textStart - textEnd); + b = start.asByteArray(); + a = end.asByteArray(); } + + byte [] prependHeader = {1, 0}; + final BigInteger startBI = new BigInteger(Bytes.add(prependHeader, a)); + final BigInteger stopBI = new BigInteger(Bytes.add(prependHeader, b)); + BigInteger diffBI = stopBI.subtract(startBI); + columnCard = diffBI; break; + } case DATE: if (isAscending) { - columnCard = new BigDecimal(end.asInt4() - start.asInt4()); - } else { - columnCard = new BigDecimal(start.asInt4() - end.asInt4()); - } - break; - case TIME: - case TIMESTAMP: - if (isAscending) { - columnCard = new BigDecimal(end.asInt8() - start.asInt8()); + columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4()); } else { - columnCard = new BigDecimal(start.asInt8() - end.asInt8()); + columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4()); } break; case INET4: if (isAscending) { - columnCard = new BigDecimal(end.asInt4() - start.asInt4()); + columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4()); } else { - columnCard = new BigDecimal(start.asInt4() - end.asInt4()); + columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4()); } break; default: throw new UnsupportedOperationException(dataType + " is not supported yet"); } - return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs(); + return inclusive ? columnCard.add(BigInteger.valueOf(1)).abs() : columnCard.abs(); } /** * It computes the value cardinality of a tuple range. * @return */ - public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) { + public static BigInteger computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) { Tuple start = range.getStart(); Tuple end = range.getEnd(); - Column col; - BigDecimal cardinality = new BigDecimal(1); - BigDecimal columnCard; + BigInteger cardinality = BigInteger.ONE; + BigInteger columnCard; for (int i = 0; i < sortSpecs.length; i++) { columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive, sortSpecs[i].isAscending()); - if (new BigDecimal(0).compareTo(columnCard) < 0) { + if (BigInteger.ZERO.compareTo(columnCard) < 0) { cardinality = cardinality.multiply(columnCard); } } @@ -174,7 +175,7 @@ public abstract class RangePartitionAlgorithm { return cardinality; } - public BigDecimal getTotalCardinality() { + public BigInteger getTotalCardinality() { return totalCard; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java index 88cb061..0a1389a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java @@ -20,26 +20,29 @@ package org.apache.tajo.engine.planner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.primitives.UnsignedLong; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.engine.exception.RangeOverflowException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.BytesUtils; import java.math.BigDecimal; +import java.math.BigInteger; import java.math.RoundingMode; import java.util.List; public class UniformRangePartition extends RangePartitionAlgorithm { private int variableId; - private BigDecimal[] cardForEachDigit; - private BigDecimal[] colCards; + private BigInteger[] cardForEachDigit; + private BigInteger[] colCards; /** * @@ -47,15 +50,18 @@ public class UniformRangePartition extends RangePartitionAlgorithm { * @param sortSpecs The description of sort keys * @param inclusive true if the end of the range is inclusive */ - public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) { + public UniformRangePartition(final TupleRange totalRange, final SortSpec[] sortSpecs, boolean inclusive) { super(sortSpecs, totalRange, inclusive); - colCards = new BigDecimal[sortSpecs.length]; + colCards = new BigInteger[sortSpecs.length]; + + normalize(sortSpecs, this.mergedRange); + for (int i = 0; i < sortSpecs.length; i++) { colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i), totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending()); } - cardForEachDigit = new BigDecimal[colCards.length]; + cardForEachDigit = new BigInteger[colCards.length]; for (int i = 0; i < colCards.length ; i++) { if (i == 0) { cardForEachDigit[i] = colCards[i]; @@ -74,17 +80,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm { Preconditions.checkArgument(partNum > 0, "The number of partitions must be positive, but the given number: " + partNum); - Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0, + Preconditions.checkArgument(totalCard.compareTo(BigInteger.valueOf(partNum)) >= 0, "the number of partition cannot exceed total cardinality (" + totalCard + ")"); int varId; for (varId = 0; varId < cardForEachDigit.length; varId++) { - if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0) + if (cardForEachDigit[varId].compareTo(BigInteger.valueOf(partNum)) >= 0) break; } this.variableId = varId; - BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1]; + BigInteger [] reverseCardsForDigit = new BigInteger[variableId+1]; for (int i = variableId; i >= 0; i--) { if (i == variableId) { reverseCardsForDigit[i] = colCards[i]; @@ -94,25 +100,81 @@ public class UniformRangePartition extends RangePartitionAlgorithm { } List<TupleRange> ranges = Lists.newArrayList(); - BigDecimal term = reverseCardsForDigit[0].divide( - new BigDecimal(partNum), RoundingMode.CEILING); - BigDecimal reminder = reverseCardsForDigit[0]; - Tuple last = range.getStart(); - while(reminder.compareTo(new BigDecimal(0)) > 0) { + + BigDecimal x = new BigDecimal(reverseCardsForDigit[0]); + + BigInteger term = x.divide(BigDecimal.valueOf(partNum), RoundingMode.CEILING).toBigInteger(); + BigInteger reminder = reverseCardsForDigit[0]; + Tuple last = mergedRange.getStart(); + TupleRange tupleRange; + while(reminder.compareTo(BigInteger.ZERO) > 0) { if (reminder.compareTo(term) <= 0) { // final one is inclusive - ranges.add(new TupleRange(sortSpecs, last, range.getEnd())); + tupleRange = new TupleRange(sortSpecs, last, mergedRange.getEnd()); } else { - Tuple next = increment(last, term.longValue(), variableId); - ranges.add(new TupleRange(sortSpecs, last, next)); + Tuple next = increment(last, term, variableId); + tupleRange = new TupleRange(sortSpecs, last, next); } + + ranges.add(tupleRange); last = ranges.get(ranges.size() - 1).getEnd(); reminder = reminder.subtract(term); } + for (TupleRange r : ranges) { + denormalize(sortSpecs, r); + } + return ranges.toArray(new TupleRange[ranges.size()]); } /** + * It normalizes the start and end keys to have the same length bytes if they are texts or bytes. + * + * @param sortSpecs The sort specs + * @param range Tuple range to be normalize + */ + public static void normalize(final SortSpec [] sortSpecs, TupleRange range) { + // normalize text fields to have same bytes length + for (int i = 0; i < sortSpecs.length; i++) { + if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) { + byte [] startBytes; + byte [] endBytes; + if (range.getStart().isNull(i)) { + startBytes = BigInteger.ZERO.toByteArray(); + } else { + startBytes = range.getStart().getBytes(i); + } + + if (range.getEnd().isNull(i)) { + endBytes = BigInteger.ZERO.toByteArray(); + } else { + endBytes = range.getEnd().getBytes(i); + } + + byte [][] padded = BytesUtils.padBytes(startBytes, endBytes); + range.getStart().put(i, DatumFactory.createText(padded[0])); + range.getEnd().put(i, DatumFactory.createText(padded[1])); + } + } + } + + /** + * Normalized keys have padding values, but it will cause the key mismatch in pull server. + * So, it denormalize the normalized keys again. + * + * @param sortSpecs The sort specs + * @param range Tuple range to be denormalized + */ + public static void denormalize(SortSpec [] sortSpecs, TupleRange range) { + for (int i = 0; i < sortSpecs.length; i++) { + if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) { + range.getStart().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getStart().getBytes(i)))); + range.getEnd().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getEnd().getBytes(i)))); + } + } + } + + /** * Check whether an overflow occurs or not. * * @param colId The column id to be checked @@ -121,105 +183,111 @@ public class UniformRangePartition extends RangePartitionAlgorithm { * @param sortSpecs * @return */ - public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) { + public boolean isOverflow(int colId, Datum last, BigInteger inc, SortSpec [] sortSpecs) { Column column = sortSpecs[colId].getSortKey(); + BigDecimal incDecimal = new BigDecimal(inc); BigDecimal candidate; boolean overflow = false; switch (column.getDataType().getType()) { case BIT: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asByte())); - return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asByte())); + return new BigDecimal(mergedRange.getEnd().get(colId).asByte()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asByte()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0; + candidate = new BigDecimal(last.asByte()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asByte())) < 0; } } case CHAR: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal((int)last.asChar())); - return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal((int)last.asChar())); + return new BigDecimal((int) mergedRange.getEnd().get(colId).asChar()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal((int)last.asChar()).subtract(inc); - return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0; + candidate = new BigDecimal((int)last.asChar()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal((int) mergedRange.getEnd().get(colId).asChar())) < 0; } } case INT2: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asInt2())); - return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asInt2())); + return new BigDecimal(mergedRange.getEnd().get(colId).asInt2()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asInt2()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0; + candidate = new BigDecimal(last.asInt2()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt2())) < 0; } } case DATE: case INT4: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asInt4())); - return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asInt4())); + return new BigDecimal(mergedRange.getEnd().get(colId).asInt4()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asInt4()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0; + candidate = new BigDecimal(last.asInt4()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt4())) < 0; } } case TIME: case TIMESTAMP: case INT8: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asInt8())); - return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asInt8())); + return new BigDecimal(mergedRange.getEnd().get(colId).asInt8()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asInt8()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0; + candidate = new BigDecimal(last.asInt8()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt8())) < 0; } } case FLOAT4: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asFloat4())); - return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asFloat4())); + return new BigDecimal(mergedRange.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asFloat4()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0; + candidate = new BigDecimal(last.asFloat4()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat4())) < 0; } } case FLOAT8: { if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal(last.asFloat8())); - return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(last.asFloat8())); + return new BigDecimal(mergedRange.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0; } else { - candidate = new BigDecimal(last.asFloat8()).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0; + candidate = new BigDecimal(last.asFloat8()).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat8())) < 0; } } case TEXT: { + byte [] lastBytes = last.asByteArray(); + byte [] endBytes = mergedRange.getEnd().getBytes(colId); + + Preconditions.checkState(lastBytes.length == endBytes.length); + if (sortSpecs[colId].isAscending()) { - candidate = inc.add(new BigDecimal((int)(last instanceof NullDatum ? '0' : last.asChars().charAt(0)))); - return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0; + candidate = incDecimal.add(new BigDecimal(new BigInteger(lastBytes))); + return new BigDecimal(new BigInteger(endBytes)).compareTo(candidate) < 0; } else { - candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc); - return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0; + candidate = new BigDecimal(new BigInteger(lastBytes)).subtract(incDecimal); + return candidate.compareTo(new BigDecimal(new BigInteger(endBytes))) < 0; } } case INET4: { int candidateIntVal; byte[] candidateBytesVal = new byte[4]; if (sortSpecs[colId].isAscending()) { - candidateIntVal = inc.intValue() + last.asInt4(); - if (candidateIntVal - inc.intValue() != last.asInt4()) { + candidateIntVal = incDecimal.intValue() + last.asInt4(); + if (candidateIntVal - incDecimal.intValue() != last.asInt4()) { return true; } Bytes.putInt(candidateBytesVal, 0, candidateIntVal); - return Bytes.compareTo(range.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0; + return Bytes.compareTo(mergedRange.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0; } else { - candidateIntVal = last.asInt4() - inc.intValue(); - if (candidateIntVal + inc.intValue() != last.asInt4()) { + candidateIntVal = last.asInt4() - incDecimal.intValue(); + if (candidateIntVal + incDecimal.intValue() != last.asInt4()) { return true; } Bytes.putInt(candidateBytesVal, 0, candidateIntVal); - return Bytes.compareTo(candidateBytesVal, range.getEnd().get(colId).asByteArray()) < 0; + return Bytes.compareTo(candidateBytesVal, mergedRange.getEnd().get(colId).asByteArray()) < 0; } } } @@ -232,20 +300,20 @@ public class UniformRangePartition extends RangePartitionAlgorithm { switch (column.getDataType().getType()) { case BIT: { long candidate = last.asByte() + inc; - byte end = range.getEnd().get(colId).asByte(); + byte end = mergedRange.getEnd().get(colId).asByte(); reminder = candidate - end; break; } case CHAR: { long candidate = last.asChar() + inc; - char end = range.getEnd().get(colId).asChar(); + char end = mergedRange.getEnd().get(colId).asChar(); reminder = candidate - end; break; } case DATE: case INT4: { int candidate = (int) (last.asInt4() + inc); - int end = range.getEnd().get(colId).asInt4(); + int end = mergedRange.getEnd().get(colId).asInt4(); reminder = candidate - end; break; } @@ -254,26 +322,34 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case INT8: case INET4: { long candidate = last.asInt8() + inc; - long end = range.getEnd().get(colId).asInt8(); + long end = mergedRange.getEnd().get(colId).asInt8(); reminder = candidate - end; break; } case FLOAT4: { float candidate = last.asFloat4() + inc; - float end = range.getEnd().get(colId).asFloat4(); + float end = mergedRange.getEnd().get(colId).asFloat4(); reminder = (long) (candidate - end); break; } case FLOAT8: { double candidate = last.asFloat8() + inc; - double end = range.getEnd().get(colId).asFloat8(); + double end = mergedRange.getEnd().get(colId).asFloat8(); reminder = (long) Math.ceil(candidate - end); break; } case TEXT: { - char candidate = ((char)(last.asChars().charAt(0) + inc)); - char end = range.getEnd().get(colId).asChars().charAt(0); - reminder = (char) (candidate - end); + byte [] lastBytes = last.asByteArray(); + byte [] endBytes = mergedRange.getEnd().get(colId).asByteArray(); + + Preconditions.checkState(lastBytes.length == endBytes.length); + + BigInteger lastBInt = new BigInteger(lastBytes); + BigInteger endBInt = new BigInteger(endBytes); + BigInteger incBInt = BigInteger.valueOf(inc); + + BigInteger candidate = lastBInt.add(incBInt); + reminder = candidate.subtract(endBInt).longValue(); break; } } @@ -285,16 +361,16 @@ public class UniformRangePartition extends RangePartitionAlgorithm { /** * * @param last - * @param inc + * @param interval * @return */ - public Tuple increment(final Tuple last, final long inc, final int baseDigit) { - BigDecimal [] incs = new BigDecimal[last.size()]; + public Tuple increment(final Tuple last, BigInteger interval, final int baseDigit) { + BigInteger [] incs = new BigInteger[last.size()]; boolean [] overflowFlag = new boolean[last.size()]; - BigDecimal [] result; - BigDecimal value = new BigDecimal(inc); + BigInteger [] result; + BigInteger value = interval; - BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1]; + BigInteger [] reverseCardsForDigit = new BigInteger[baseDigit + 1]; for (int i = baseDigit; i >= 0; i--) { if (i == baseDigit) { reverseCardsForDigit[i] = colCards[i]; @@ -313,11 +389,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm { for (int i = finalId; i >= 0; i--) { if (isOverflow(i, last.get(i), incs[i], sortSpecs)) { if (i == 0) { - throw new RangeOverflowException(range, last, incs[i].longValue()); + throw new RangeOverflowException(mergedRange, last, incs[i].longValue(), sortSpecs[i].isAscending()); } long rem = incrementAndGetReminder(i, last.get(i), value.longValue()); - incs[i] = new BigDecimal(rem); - incs[i - 1] = incs[i-1].add(new BigDecimal(1)); + incs[i] = BigInteger.valueOf(rem); + incs[i - 1] = incs[i-1].add(BigInteger.ONE); overflowFlag[i] = true; } else { if (i > 0) { @@ -329,7 +405,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { for (int i = 0; i < incs.length; i++) { if (incs[i] == null) { - incs[i] = new BigDecimal(0); + incs[i] = BigInteger.ZERO; } } @@ -340,7 +416,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { switch (column.getDataType().getType()) { case CHAR: if (overflowFlag[i]) { - end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue()))); + end.put(i, DatumFactory.createChar((char) (mergedRange.getStart().get(i).asChar() + incs[i].longValue()))); } else { end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue()))); } @@ -348,7 +424,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case BIT: if (overflowFlag[i]) { end.put(i, DatumFactory.createBit( - (byte) (range.getStart().get(i).asByte() + incs[i].longValue()))); + (byte) (mergedRange.getStart().get(i).asByte() + incs[i].longValue()))); } else { end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue()))); } @@ -356,7 +432,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case INT2: if (overflowFlag[i]) { end.put(i, DatumFactory.createInt2( - (short) (range.getStart().get(i).asInt2() + incs[i].longValue()))); + (short) (mergedRange.getStart().get(i).asInt2() + incs[i].longValue()))); } else { end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue()))); } @@ -364,7 +440,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case INT4: if (overflowFlag[i]) { end.put(i, DatumFactory.createInt4( - (int) (range.getStart().get(i).asInt4() + incs[i].longValue()))); + (int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue()))); } else { if (sortSpecs[i].isAscending()) { end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue()))); @@ -376,7 +452,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case INT8: if (overflowFlag[i]) { end.put(i, DatumFactory.createInt8( - range.getStart().get(i).asInt8() + incs[i].longValue())); + mergedRange.getStart().get(i).asInt8() + incs[i].longValue())); } else { end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue())); } @@ -384,7 +460,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case FLOAT4: if (overflowFlag[i]) { end.put(i, DatumFactory.createFloat4( - range.getStart().get(i).asFloat4() + incs[i].longValue())); + mergedRange.getStart().get(i).asFloat4() + incs[i].longValue())); } else { end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue())); } @@ -392,30 +468,35 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case FLOAT8: if (overflowFlag[i]) { end.put(i, DatumFactory.createFloat8( - range.getStart().get(i).asFloat8() + incs[i].longValue())); + mergedRange.getStart().get(i).asFloat8() + incs[i].longValue())); } else { end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue())); } break; case TEXT: if (overflowFlag[i]) { - end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0) + end.put(i, DatumFactory.createText(((char) (mergedRange.getStart().get(i).asChars().charAt(0) + incs[i].longValue())) + "")); } else { - end.put(i, DatumFactory.createText( - ((char) ((last.get(i) instanceof NullDatum ? '0': last.get(i).asChars().charAt(0)) + incs[i].longValue())) + "")); + BigInteger lastBigInt; + if (last.isNull(i)) { + lastBigInt = BigInteger.valueOf(0); + } else { + lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue(); + } + end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray())); } break; case DATE: if (overflowFlag[i]) { - end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue()))); + end.put(i, DatumFactory.createDate((int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue()))); } else { end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue()))); } break; case TIME: if (overflowFlag[i]) { - end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue())); + end.put(i, DatumFactory.createTime(mergedRange.getStart().get(i).asInt8() + incs[i].longValue())); } else { end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue())); } @@ -423,7 +504,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case TIMESTAMP: if (overflowFlag[i]) { end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis( - range.getStart().get(i).asInt8() + incs[i].longValue())); + mergedRange.getStart().get(i).asInt8() + incs[i].longValue())); } else { end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() + incs[i].longValue())); } @@ -431,7 +512,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm { case INET4: byte[] ipBytes; if (overflowFlag[i]) { - ipBytes = range.getStart().get(i).asByteArray(); + ipBytes = mergedRange.getStart().get(i).asByteArray(); assert ipBytes.length == 4; end.put(i, DatumFactory.createInet4(ipBytes)); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 1fa3f11..43d6fd2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -56,7 +56,7 @@ import org.apache.tajo.worker.FetchImpl; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; +import java.math.BigInteger; import java.net.URI; import java.util.*; import java.util.Map.Entry; @@ -574,12 +574,12 @@ public class Repartitioner { } TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); - BigDecimal card = partitioner.getTotalCardinality(); + BigInteger card = partitioner.getTotalCardinality(); // if the number of the range cardinality is less than the desired number of tasks, // we set the the number of tasks to the number of range cardinality. int determinedTaskNum; - if (card.compareTo(new BigDecimal(maxNum)) < 0) { + if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { LOG.info(subQuery.getId() + ", The range cardinality (" + card + ") is less then the desired number of tasks (" + maxNum + ")"); determinedTaskNum = card.intValue(); @@ -587,9 +587,7 @@ public class Repartitioner { determinedTaskNum = maxNum; } - // for LOG - TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), true); - LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum + + LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); TupleRange [] ranges = partitioner.partition(determinedTaskNum); if (ranges == null || ranges.length == 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java index f4c114f..58653d1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java @@ -27,6 +27,8 @@ import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; import org.junit.Test; +import java.math.BigInteger; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -68,11 +70,11 @@ public class TestUniformRangePartition { result[10] = "DB"; result[11] = "DC"; - Tuple end = partitioner.increment(s, 1, 1); + Tuple end = partitioner.increment(s, BigInteger.valueOf(1), 1); assertEquals("A", end.get(0).asChars()); assertEquals("B", end.get(1).asChars()); for (int i = 2; i < 11; i++ ) { - end = partitioner.increment(end, 1, 1); + end = partitioner.increment(end, BigInteger.valueOf(1), 1); assertEquals(result[i].charAt(0), end.get(0).asChars().charAt(0)); assertEquals(result[i].charAt(1), end.get(1).asChars().charAt(0)); } @@ -115,10 +117,10 @@ public class TestUniformRangePartition { result[10] = "DB"; result[11] = "DC"; - Tuple end = partitioner.increment(s, 6, 1); + Tuple end = partitioner.increment(s, BigInteger.valueOf(6), 1); assertEquals("C", end.get(0).asChars()); assertEquals("A", end.get(1).asChars()); - end = partitioner.increment(end, 5, 1); + end = partitioner.increment(end, BigInteger.valueOf(5), 1); assertEquals("D", end.get(0).asChars()); assertEquals("C", end.get(1).asChars()); } @@ -149,11 +151,11 @@ public class TestUniformRangePartition { UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); assertEquals(24, partitioner.getTotalCardinality().intValue()); - Tuple overflowBefore = partitioner.increment(s, 5, 2); + Tuple overflowBefore = partitioner.increment(s, BigInteger.valueOf(5), 2); assertEquals("A", overflowBefore.get(0).asChars()); assertEquals("B", overflowBefore.get(1).asChars()); assertEquals("C", overflowBefore.get(2).asChars()); - Tuple overflowed = partitioner.increment(overflowBefore, 1, 2); + Tuple overflowed = partitioner.increment(overflowBefore, BigInteger.valueOf(1), 2); assertEquals("B", overflowed.get(0).asChars()); assertEquals("A", overflowed.get(1).asChars()); assertEquals("A", overflowed.get(2).asChars()); @@ -179,10 +181,10 @@ public class TestUniformRangePartition { UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); assertEquals(200, partitioner.getTotalCardinality().longValue()); - Tuple range2 = partitioner.increment(s, 100, 1); + Tuple range2 = partitioner.increment(s, BigInteger.valueOf(100), 1); assertEquals(15, range2.get(0).asInt4()); assertEquals(20, range2.get(1).asInt4()); - Tuple range3 = partitioner.increment(range2, 99, 1); + Tuple range3 = partitioner.increment(range2, BigInteger.valueOf(99), 1); assertEquals(19, range3.get(0).asInt4()); assertEquals(39, range3.get(1).asInt4()); } @@ -209,11 +211,11 @@ public class TestUniformRangePartition { UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); assertEquals(24, partitioner.getTotalCardinality().longValue()); - Tuple beforeOverflow = partitioner.increment(s, 5, 2); + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); assertEquals(1, beforeOverflow.get(0).asInt8()); assertEquals(2, beforeOverflow.get(1).asInt8()); assertEquals(3, beforeOverflow.get(2).asInt8()); - Tuple overflow = partitioner.increment(beforeOverflow, 1, 2); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); assertEquals(2, overflow.get(0).asInt8()); assertEquals(1, overflow.get(1).asInt8()); assertEquals(1, overflow.get(2).asInt8()); @@ -242,11 +244,11 @@ public class TestUniformRangePartition { UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); assertEquals(24, partitioner.getTotalCardinality().longValue()); - Tuple beforeOverflow = partitioner.increment(s, 5, 2); + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); assertTrue(1.1d == beforeOverflow.get(0).asFloat8()); assertTrue(2.1d == beforeOverflow.get(1).asFloat8()); assertTrue(3.1d == beforeOverflow.get(2).asFloat8()); - Tuple overflow = partitioner.increment(beforeOverflow, 1, 2); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); assertTrue(2.1d == overflow.get(0).asFloat8()); assertTrue(1.1d == overflow.get(1).asFloat8()); assertTrue(1.1d == overflow.get(2).asFloat8()); @@ -275,11 +277,11 @@ public class TestUniformRangePartition { UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); assertEquals(24, partitioner.getTotalCardinality().longValue()); - Tuple beforeOverflow = partitioner.increment(s, 5, 2); + Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2); assertTrue("127.0.1.1".equals(beforeOverflow.get(0).asChars())); assertTrue("127.0.0.2".equals(beforeOverflow.get(1).asChars())); assertTrue("128.0.0.255".equals(beforeOverflow.get(2).asChars())); - Tuple overflow = partitioner.increment(beforeOverflow, 1, 2); + Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2); assertTrue("127.0.1.2".equals(overflow.get(0).asChars())); assertTrue("127.0.0.1".equals(overflow.get(1).asChars())); assertTrue("128.0.0.253".equals(overflow.get(2).asChars())); @@ -360,6 +362,133 @@ public class TestUniformRangePartition { } @Test + public void testPartitionForMultipleChars() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + Tuple s = new VTuple(1); + s.put(0, DatumFactory.createText("AAA")); + Tuple e = new VTuple(1); + e.put(0, DatumFactory.createText("ZZZ")); + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(48); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev == null) { + prev = r; + } else { + assertTrue(prev.compareTo(r) < 0); + } + } + assertEquals(48, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[47].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleChars2() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + Tuple s = new VTuple(1); + s.put(0, DatumFactory.createText("A1")); + Tuple e = new VTuple(1); + e.put(0, DatumFactory.createText("A999975")); + + final int partNum = 2; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev == null) { + prev = r; + } else { + assertTrue(prev.compareTo(r) < 0); + } + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleChars2Desc() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + sortSpecs[0].setDescOrder(); + + Tuple s = new VTuple(1); + s.put(0, DatumFactory.createText("A999975")); + Tuple e = new VTuple(1); + e.put(0, DatumFactory.createText("A1")); + + final int partNum = 48; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev == null) { + prev = r; + } else { + assertTrue(prev.compareTo(r) > 0); + } + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test + public void testPartitionForMultipleCharsWithSameFirstChar() { + Schema schema = new Schema() + .addColumn("KEY1", Type.TEXT); + + SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema); + + Tuple s = new VTuple(1); + s.put(0, DatumFactory.createText("AAA")); + Tuple e = new VTuple(1); + e.put(0, DatumFactory.createText("AAZ")); + + final int partNum = 4; + + TupleRange expected = new TupleRange(sortSpecs, s, e); + RangePartitionAlgorithm partitioner = + new UniformRangePartition(expected, sortSpecs, true); + TupleRange [] ranges = partitioner.partition(partNum); + + TupleRange prev = null; + for (TupleRange r : ranges) { + if (prev == null) { + prev = r; + } else { + assertTrue(prev.compareTo(r) < 0); + } + } + assertEquals(partNum, ranges.length); + assertTrue(ranges[0].getStart().equals(s)); + assertTrue(ranges[partNum - 1].getEnd().equals(e)); + } + + @Test public void testPartitionForOnePartNumWithBothValueNull() { Schema schema = new Schema() .addColumn("l_returnflag", Type.TEXT) @@ -406,7 +535,7 @@ public class TestUniformRangePartition { if (prev == null) { prev = r; } else { - assertTrue(prev.compareTo(r) > 0); + assertTrue(prev.compareTo(r) < 0); } } }
