TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik) Closes #70
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47d4fe22 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47d4fe22 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47d4fe22 Branch: refs/heads/index_support Commit: 47d4fe22b8e5594d82054fa5a7b5cdc23f578be6 Parents: aee7874 Author: Hyunsik Choi <[email protected]> Authored: Tue Jul 15 14:55:47 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Tue Jul 15 14:55:47 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/statistics/ColumnStats.java | 10 ++- .../engine/planner/RangePartitionAlgorithm.java | 2 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 70 +++++++++++++++++--- .../tajo/master/querymaster/Repartitioner.java | 19 +++++- .../main/java/org/apache/tajo/worker/Task.java | 4 +- .../apache/tajo/engine/query/TestSortQuery.java | 49 ++++++++++++++ .../org/apache/tajo/storage/RowStoreUtil.java | 1 + .../apache/tajo/storage/TableStatistics.java | 5 +- .../tajo/pullserver/TajoPullServerService.java | 2 +- 10 files changed, 141 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 473db19..d819d20 100644 --- a/CHANGES +++ b/CHANGES @@ -84,6 +84,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik) + TAJO-936: TestStorages::testSplitable is failed occasionally. (jinho) TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java index 4d65d9a..1f3bca3 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java @@ -24,13 +24,13 @@ package org.apache.tajo.catalog.statistics; import com.google.common.base.Objects; import com.google.gson.annotations.Expose; import com.google.protobuf.ByteString; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.json.GsonObject; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.json.GsonObject; import org.apache.tajo.util.TUtil; public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>, Cloneable, GsonObject { @@ -109,6 +109,10 @@ public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>, this.numNulls = numNulls; } + public boolean hasNullValue() { + return numNulls > 0; + } + public boolean equals(Object obj) { if (obj instanceof ColumnStats) { ColumnStats other = (ColumnStats) obj; http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java index a3522c7..0aa6f97 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java @@ -115,7 +115,7 @@ public abstract class RangePartitionAlgorithm { break; case TEXT: final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0); - final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0); + final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0); if (isAscending) { columnCard = new BigDecimal(textEnd - textStart); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index 86f4935..f2e47bc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; @@ -44,6 +46,7 @@ import java.util.List; import java.util.Map; public class TupleUtil { + private static final Log LOG = LogFactory.getLog(TupleUtil.class); public static String rangeToQuery(Schema schema, TupleRange range, boolean last) throws UnsupportedEncodingException { @@ -72,7 +75,41 @@ public class TupleUtil { return sb.toString(); } - public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats) { + /** + * if max value is null, set ranges[last] + * @param sortSpecs + * @param sortSchema + * @param colStats + * @param ranges + */ + public static void setMaxRangeIfNull(SortSpec[] sortSpecs, Schema sortSchema, + List<ColumnStats> colStats, TupleRange[] ranges) { + Map<Column, ColumnStats> statMap = Maps.newHashMap(); + for (ColumnStats stat : colStats) { + statMap.put(stat.getColumn(), stat); + } + + int i = 0; + for (Column col : sortSchema.getColumns()) { + ColumnStats columnStat = statMap.get(col); + if (columnStat == null) { + continue; + } + if (columnStat.hasNullValue()) { + int rangeIndex = sortSpecs[i].isAscending() ? ranges.length - 1 : 0; + VTuple rangeTuple = sortSpecs[i].isAscending() ? (VTuple) ranges[rangeIndex].getEnd() : (VTuple) ranges[rangeIndex].getStart(); + if (LOG.isDebugEnabled()) { + LOG.debug("Set null into range: " + col.getQualifiedName() + ", previous tuple is " + rangeTuple); + } + rangeTuple.put(i, NullDatum.get()); + LOG.info("Set null into range: " + col.getQualifiedName() + ", current tuple is " + rangeTuple); + } + i++; + } + } + + public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats> colStats, + boolean checkNull) { Map<Column, ColumnStats> statSet = Maps.newHashMap(); for (ColumnStats stat : colStats) { @@ -98,16 +135,29 @@ public class TupleUtil { else startTuple.put(i, DatumFactory.createNullDatum()); - if (statSet.get(col).getMaxValue() != null) - endTuple.put(i, statSet.get(col).getMaxValue()); - else - endTuple.put(i, DatumFactory.createNullDatum()); + if (checkNull) { + if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null) + endTuple.put(i, DatumFactory.createNullDatum()); + else + endTuple.put(i, statSet.get(col).getMaxValue()); + } else { + if (statSet.get(col).getMaxValue() != null) + endTuple.put(i, statSet.get(col).getMaxValue()); + else + endTuple.put(i, DatumFactory.createNullDatum()); + } } else { - if (statSet.get(col).getMaxValue() != null) - startTuple.put(i, statSet.get(col).getMaxValue()); - else - startTuple.put(i, DatumFactory.createNullDatum()); - + if (checkNull) { + if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null) + startTuple.put(i, DatumFactory.createNullDatum()); + else + startTuple.put(i, statSet.get(col).getMaxValue()); + } else { + if (statSet.get(col).getMaxValue() != null) + startTuple.put(i, statSet.get(col).getMaxValue()); + else + startTuple.put(i, DatumFactory.createNullDatum()); + } if (statSet.get(col).getMinValue() != null) endTuple.put(i, statSet.get(col).getMinValue()); else http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 1cc5b78..055e9a2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -572,7 +572,7 @@ public class Repartitioner { if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) { return; } - TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats()); + TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false); RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs); BigDecimal card = partitioner.getTotalCardinality(); @@ -580,16 +580,29 @@ public class Repartitioner { // we set the the number of tasks to the number of range cardinality. int determinedTaskNum; if (card.compareTo(new BigDecimal(maxNum)) < 0) { - LOG.info("The range cardinality (" + card + LOG.info(subQuery.getId() + ", The range cardinality (" + card + ") is less then the desired number of tasks (" + maxNum + ")"); determinedTaskNum = card.intValue(); } else { determinedTaskNum = maxNum; } - LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum + + // for LOG + TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), true); + LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); TupleRange [] ranges = partitioner.partition(determinedTaskNum); + if (ranges == null || ranges.length == 0) { + LOG.warn(subQuery.getId() + " no range infos."); + } + TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); + if (LOG.isDebugEnabled()) { + if (ranges != null) { + for (TupleRange eachRange : ranges) { + LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + } + } + } FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); SubQuery.scheduleFragment(subQuery, dummyFragment); http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 991dc4b..c3f3827 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -342,10 +342,10 @@ public class Task { builder.setResultStats(new TableStats().getProto()); } - Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs(); + Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); if (it.hasNext()) { do { - Entry<Integer,String> entry = it.next(); + Entry<Integer, String> entry = it.next(); ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); part.setPartId(entry.getKey()); http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java index 0b1831c..a520e56 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java @@ -21,13 +21,21 @@ package org.apache.tajo.engine.query; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.ResultSet; import java.util.TimeZone; +import static org.junit.Assert.assertEquals; + @Category(IntegrationTest.class) public class TestSortQuery extends QueryTestCaseBase { @@ -169,4 +177,45 @@ public class TestSortQuery extends QueryTestCaseBase { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testSortNullColumn() throws Exception { + try { + testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2"); + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + String[] data = new String[]{ + "1|BRAZIL", + "2|ALGERIA", + "3|ARGENTINA", + "4|CANADA" + }; + TajoTestingCluster.createTable("nullsort", schema, tableOptions, data, 2); + + ResultSet res = executeString( + "select * from (" + + "select case when id > 2 then null else id end as col1, name as col2 from nullsort) a " + + "order by col1, col2" + ); + + String expected = "col1,col2\n" + + "-------------------------------\n" + + "1,BRAZIL\n" + + "2,ALGERIA\n" + + "null,ARGENTINA\n" + + "null,CANADA\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "0"); + executeString("DROP TABLE nullsort PURGE;").close(); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 33b2ff3..5140a63 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -181,6 +181,7 @@ public class RowStoreUtil { for (int i = 0; i < schema.size(); i++) { if (tuple.isNull(i)) { nullFlags.set(i); + continue; } col = schema.getColumn(i); http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java index ac9bd8a..a2c08de 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -40,7 +40,6 @@ public class TableStatistics { private long numRows = 0; private long numBytes = 0; - private boolean [] comparable; public TableStatistics(Schema schema) { @@ -113,10 +112,10 @@ public class TableStatistics { LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); } - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { + if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { columnStats.setMaxValue(maxValues.get(i)); } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + + LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); } stat.addColumnStat(columnStats); http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 5b76da5..12cd1a3 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -597,7 +597,7 @@ public class TajoPullServerService extends AbstractService { if (comparator.compare(end, idxReader.getFirstKey()) < 0 || comparator.compare(idxReader.getLastKey(), start) < 0) { - LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + "], but request start:" + start + ", end: " + end); return null; }
