http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 074da6b..74cb2e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -593,7 +593,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { newSortCols.add(new Order( partn.getCols().get(sortCol.getIndexes().get(0)).getName(), sortCol.getSortOrder() == '+' ? BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC : - BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC)); + BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC, + sortCol.getNullSortOrder() == 'a' ? BaseSemanticAnalyzer.HIVE_COLUMN_NULLS_FIRST : + BaseSemanticAnalyzer.HIVE_COLUMN_NULLS_LAST)); } else { // If the table is sorted on a partition column, not valid for sorting updateSortCols = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index fdc1dff..f6471db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -91,6 +91,8 @@ public class HybridHashTableContainer /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; private boolean[] sortableSortOrders; + private byte[] nullMarkers; + private byte[] notNullMarkers; private MapJoinBytesTableContainer.KeyValueHelper writeHelper; private final MapJoinBytesTableContainer.DirectKeyValueWriter directWriteHelper; /* @@ -417,6 +419,14 @@ public class HybridHashTableContainer return sortableSortOrders; } + public byte[] getNullMarkers() { + return nullMarkers; + } + + public byte[] getNotNullMarkers() { + return notNullMarkers; + } + /* For a given row, put it into proper partition based on its hash value. * When memory threshold is reached, the biggest hash table in memory will be spilled to disk. * If the hash table of a specific partition is already on disk, all later rows will be put into @@ -708,7 +718,8 @@ public class HybridHashTableContainer nulls[i] = currentKey[i] == null; } return currentValue.setFromOutput( - MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); + MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, + sortableSortOrders, nullMarkers, notNullMarkers)); } @Override @@ -723,7 +734,8 @@ public class HybridHashTableContainer nulls[keyIndex] = currentKey[keyIndex] == null; } return currentValue.setFromOutput( - MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); + MapJoinKey.serializeRow(output, currentKey, ois, + sortableSortOrders, nullMarkers, notNullMarkers)); } @Override @@ -1064,6 +1076,12 @@ public class HybridHashTableContainer if (sortableSortOrders == null) { sortableSortOrders = ((BinarySortableSerDe) keySerde).getSortOrders(); } + if (nullMarkers == null) { + nullMarkers = ((BinarySortableSerDe) keySerde).getNullMarkers(); + } + if (notNullMarkers == null) { + notNullMarkers = ((BinarySortableSerDe) keySerde).getNotNullMarkers(); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 5c2ff92..a8aa71a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -25,11 +25,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.debug.Utils; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; @@ -53,9 +50,9 @@ import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStruct import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -63,6 +60,8 @@ import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.HashCodeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Table container that serializes keys and values using LazyBinarySerDe into @@ -83,6 +82,8 @@ public class MapJoinBytesTableContainer * ordering. Hence, remember the ordering here; it is null if we do use LazyBinarySerDe. */ private boolean[] sortableSortOrders; + private byte[] nullMarkers; + private byte[] notNullMarkers; private KeyValueHelper writeHelper; private DirectKeyValueWriter directWriteHelper; @@ -138,6 +139,14 @@ public class MapJoinBytesTableContainer this.sortableSortOrders = sortableSortOrders; } + public void setNullMarkers(byte[] nullMarkers) { + this.nullMarkers = nullMarkers; + } + + public void setNotNullMarkers(byte[] notNullMarkers) { + this.notNullMarkers = notNullMarkers; + } + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { void setKeyValue(Writable key, Writable val) throws SerDeException; /** Get hash value from the key. */ @@ -269,7 +278,14 @@ public class MapJoinBytesTableContainer fois.add(fields.get(i).getFieldObjectInspector()); } Output output = new Output(); - BinarySortableSerDe.serializeStruct(output, data, fois, new boolean[fields.size()]); + boolean[] sortableSortOrders = new boolean[fields.size()]; + Arrays.fill(sortableSortOrders, false); + byte[] columnNullMarker = new byte[fields.size()]; + Arrays.fill(columnNullMarker, BinarySortableSerDe.ZERO); + byte[] columnNotNullMarker = new byte[fields.size()]; + Arrays.fill(columnNotNullMarker, BinarySortableSerDe.ONE); + BinarySortableSerDe.serializeStruct(output, data, fois, sortableSortOrders, + columnNullMarker, columnNotNullMarker); hasTag = (output.getLength() != b.getLength()); if (hasTag) { LOG.error("Tag found in keys and will be removed. This should not happen."); @@ -360,10 +376,14 @@ public class MapJoinBytesTableContainer writeHelper = new LazyBinaryKvWriter(keySerde, valSoi, valueContext.hasFilterTag()); internalValueOi = valSoi; sortableSortOrders = ((BinarySortableSerDe) keySerde).getSortOrders(); + nullMarkers = ((BinarySortableSerDe) keySerde).getNullMarkers(); + notNullMarkers = ((BinarySortableSerDe) keySerde).getNotNullMarkers(); } else { writeHelper = new KeyValueWriter(keySerde, valSerde, valueContext.hasFilterTag()); internalValueOi = createInternalOi(valueContext); sortableSortOrders = null; + nullMarkers = null; + notNullMarkers = null; } } } @@ -476,7 +496,8 @@ public class MapJoinBytesTableContainer nulls[i] = currentKey[i] == null; } return currentValue.setFromOutput( - MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); + MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, + sortableSortOrders, nullMarkers, notNullMarkers)); } @Override @@ -491,7 +512,8 @@ public class MapJoinBytesTableContainer nulls[keyIndex] = currentKey[keyIndex] == null; } return currentValue.setFromOutput( - MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); + MapJoinKey.serializeRow(output, currentKey, ois, + sortableSortOrders, nullMarkers, notNullMarkers)); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java index cfb9abc..9f27f56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -117,7 +117,8 @@ public abstract class MapJoinKey { */ public static Output serializeVector(Output byteStream, VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch, - boolean[] nulls, boolean[] sortableSortOrders) throws HiveException, SerDeException { + boolean[] nulls, boolean[] sortableSortOrders, byte[] nullMarkers, byte[] notNullMarkers) + throws HiveException, SerDeException { Object[] fieldData = new Object[keyOutputWriters.length]; List<ObjectInspector> fieldOis = new ArrayList<ObjectInspector>(); for (int i = 0; i < keyOutputWriters.length; ++i) { @@ -130,7 +131,8 @@ public abstract class MapJoinKey { nulls[i] = (fieldData[i] == null); } } - return serializeRow(byteStream, fieldData, fieldOis, sortableSortOrders); + return serializeRow(byteStream, fieldData, fieldOis, sortableSortOrders, + nullMarkers, notNullMarkers); } public static MapJoinKey readFromRow(Output output, MapJoinKey key, Object[] keyObject, @@ -145,7 +147,8 @@ public abstract class MapJoinKey { * @param byteStream Output to reuse. Can be null, in that case a new one would be created. */ public static Output serializeRow(Output byteStream, Object[] fieldData, - List<ObjectInspector> fieldOis, boolean[] sortableSortOrders) throws HiveException { + List<ObjectInspector> fieldOis, boolean[] sortableSortOrders, + byte[] nullMarkers, byte[] notNullMarkers) throws HiveException { if (byteStream == null) { byteStream = new Output(); } else { @@ -157,7 +160,8 @@ public abstract class MapJoinKey { } else if (sortableSortOrders == null) { LazyBinarySerDe.serializeStruct(byteStream, fieldData, fieldOis); } else { - BinarySortableSerDe.serializeStruct(byteStream, fieldData, fieldOis, sortableSortOrders); + BinarySortableSerDe.serializeStruct(byteStream, fieldData, fieldOis, sortableSortOrders, + nullMarkers, notNullMarkers); } } catch (SerDeException e) { throw new HiveException("Serialization error", e); http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java index 1510fdd..3f16359 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java @@ -180,7 +180,7 @@ public class DynamicPartitionPruner { numExpectedEventsPerSource.get(s).decrement(); ++sourceInfoCount; String columnName = cit.next(); - String columnType = typit.next(); + String columnType = typit.next(); ExprNodeDesc partKeyExpr = pit.next(); SourceInfo si = createSourceInfo(t, partKeyExpr, columnName, columnType, jobConf); if (!sourceInfoMap.containsKey(s)) { http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 7bdd11a..8133aef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -27,9 +27,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; @@ -39,14 +39,15 @@ import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -188,6 +189,60 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re return columnSortOrderIsDesc; } + private byte[] getColumnNullMarker(Properties properties, int columnCount, boolean[] columnSortOrder) { + String columnNullOrder = properties.getProperty(serdeConstants.SERIALIZATION_NULL_SORT_ORDER); + byte[] columnNullMarker = new byte[columnCount]; + for (int i = 0; i < columnNullMarker.length; i++) { + if (columnSortOrder[i]) { + // Descending + if (columnNullOrder != null && columnNullOrder.charAt(i) == 'a') { + // Null first + columnNullMarker[i] = BinarySortableSerDe.ONE; + } else { + // Null last (default for descending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + } + } else { + // Ascending + if (columnNullOrder != null && columnNullOrder.charAt(i) == 'z') { + // Null last + columnNullMarker[i] = BinarySortableSerDe.ONE; + } else { + // Null first (default for ascending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + } + } + } + return columnNullMarker; + } + + private byte[] getColumnNotNullMarker(Properties properties, int columnCount, boolean[] columnSortOrder) { + String columnNullOrder = properties.getProperty(serdeConstants.SERIALIZATION_NULL_SORT_ORDER); + byte[] columnNotNullMarker = new byte[columnCount]; + for (int i = 0; i < columnNotNullMarker.length; i++) { + if (columnSortOrder[i]) { + // Descending + if (columnNullOrder != null && columnNullOrder.charAt(i) == 'a') { + // Null first + columnNotNullMarker[i] = BinarySortableSerDe.ZERO; + } else { + // Null last (default for descending order) + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } + } else { + // Ascending + if (columnNullOrder != null && columnNullOrder.charAt(i) == 'z') { + // Null last + columnNotNullMarker[i] = BinarySortableSerDe.ZERO; + } else { + // Null first (default for ascending order) + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } + } + } + return columnNotNullMarker; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); @@ -217,8 +272,13 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re TableDesc keyTableDesc = conf.getKeySerializeInfo(); boolean[] columnSortOrder = getColumnSortOrder(keyTableDesc.getProperties(), reduceSinkKeyColumnMap.length); + byte[] columnNullMarker = + getColumnNullMarker(keyTableDesc.getProperties(), reduceSinkKeyColumnMap.length, columnSortOrder); + byte[] columnNotNullMarker = + getColumnNotNullMarker(keyTableDesc.getProperties(), reduceSinkKeyColumnMap.length, columnSortOrder); - keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder); + keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder, + columnNullMarker, columnNotNullMarker); // Create all nulls key. try { http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fdc7956..ad17096 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -946,7 +946,7 @@ public class Hive { FieldSchema col = cols.get(i); if (indexedCols.contains(col.getName())) { indexTblCols.add(col); - sortCols.add(new Order(col.getName(), 1)); + sortCols.add(new Order(col.getName(), 1, 0)); k++; } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index b57dc77..677649d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -357,7 +357,8 @@ abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc impleme Order o = sortCols.get(pos); if (pos < sortColumnsFirstPartition.size()) { - if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) { + if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder() || + o.getNullOrder() != sortColumnsFirstPartition.get(pos).getNullOrder()) { return false; } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index d5f3057..3d580d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -148,23 +148,55 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { // Get the sort positions and sort order for the table // The sort order contains whether the sorting is happening ascending or descending - private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder( + private List<Integer> getSortPositions( List<Order> tabSortCols, List<FieldSchema> tabCols) { List<Integer> sortPositions = new ArrayList<Integer>(); - List<Integer> sortOrders = new ArrayList<Integer>(); for (Order sortCol : tabSortCols) { int pos = 0; for (FieldSchema tabCol : tabCols) { if (sortCol.getCol().equals(tabCol.getName())) { sortPositions.add(pos); + break; + } + pos++; + } + } + return sortPositions; + } + + private List<Integer> getSortOrder( + List<Order> tabSortCols, + List<FieldSchema> tabCols) { + List<Integer> sortOrders = new ArrayList<Integer>(); + for (Order sortCol : tabSortCols) { + int pos = 0; + for (FieldSchema tabCol : tabCols) { + if (sortCol.getCol().equals(tabCol.getName())) { sortOrders.add(sortCol.getOrder()); break; } pos++; } } - return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders); + return sortOrders; + } + + private List<Integer> getNullSortOrder( + List<Order> tabSortCols, + List<FieldSchema> tabCols) { + List<Integer> nullSortOrders = new ArrayList<Integer>(); + for (Order sortCol : tabSortCols) { + int pos = 0; + for (FieldSchema tabCol : tabCols) { + if (sortCol.getCol().equals(tabCol.getName())) { + nullSortOrders.add(sortCol.getNullOrder()); + break; + } + pos++; + } + } + return nullSortOrders; } // Return true if the partition is bucketed/sorted by the specified positions @@ -174,6 +206,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { List<Integer> bucketPositionsDest, List<Integer> sortPositionsDest, List<Integer> sortOrderDest, + List<Integer> sortNullOrderDest, int numBucketsDest) { // The bucketing and sorting positions should exactly match int numBuckets = partition.getBucketCount(); @@ -183,11 +216,16 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { List<Integer> partnBucketPositions = getBucketPositions(partition.getBucketCols(), partition.getTable().getCols()); - ObjectPair<List<Integer>, List<Integer>> partnSortPositionsOrder = - getSortPositionsOrder(partition.getSortCols(), partition.getTable().getCols()); + List<Integer> sortPositions = + getSortPositions(partition.getSortCols(), partition.getTable().getCols()); + List<Integer> sortOrder = + getSortOrder(partition.getSortCols(), partition.getTable().getCols()); + List<Integer> sortNullOrder = + getNullSortOrder(partition.getSortCols(), partition.getTable().getCols()); return bucketPositionsDest.equals(partnBucketPositions) && - sortPositionsDest.equals(partnSortPositionsOrder.getFirst()) && - sortOrderDest.equals(partnSortPositionsOrder.getSecond()); + sortPositionsDest.equals(sortPositions) && + sortOrderDest.equals(sortOrder) && + sortNullOrderDest.equals(sortNullOrder); } // Return true if the table is bucketed/sorted by the specified positions @@ -197,6 +235,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { List<Integer> bucketPositionsDest, List<Integer> sortPositionsDest, List<Integer> sortOrderDest, + List<Integer> sortNullOrderDest, int numBucketsDest) { // The bucketing and sorting positions should exactly match int numBuckets = table.getNumBuckets(); @@ -206,11 +245,16 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { List<Integer> tableBucketPositions = getBucketPositions(table.getBucketCols(), table.getCols()); - ObjectPair<List<Integer>, List<Integer>> tableSortPositionsOrder = - getSortPositionsOrder(table.getSortCols(), table.getCols()); + List<Integer> sortPositions = + getSortPositions(table.getSortCols(), table.getCols()); + List<Integer> sortOrder = + getSortOrder(table.getSortCols(), table.getCols()); + List<Integer> sortNullOrder = + getNullSortOrder(table.getSortCols(), table.getCols()); return bucketPositionsDest.equals(tableBucketPositions) && - sortPositionsDest.equals(tableSortPositionsOrder.getFirst()) && - sortOrderDest.equals(tableSortPositionsOrder.getSecond()); + sortPositionsDest.equals(sortPositions) && + sortOrderDest.equals(sortOrder) && + sortNullOrderDest.equals(sortNullOrder); } // Store the bucket path to bucket number mapping in the table scan operator. @@ -288,7 +332,8 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc, List<ExprNodeColumnDesc> sourceTableBucketCols, List<ExprNodeColumnDesc> sourceTableSortCols, - List<Integer> sortOrder) { + List<Integer> sortOrder, + List<Integer> sortNullOrder) { // The sort-merge join creates the output sorted and bucketized by the same columns. // This can be relaxed in the future if there is a requirement. if (!sourceTableBucketCols.equals(sourceTableSortCols)) { @@ -426,10 +471,12 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { // also match for this to be converted to a map-only job. List<Integer> bucketPositions = getBucketPositions(destTable.getBucketCols(), destTable.getCols()); - ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = - getSortPositionsOrder(destTable.getSortCols(), destTable.getCols()); - List<Integer> sortPositions = sortOrderPositions.getFirst(); - List<Integer> sortOrder = sortOrderPositions.getSecond(); + List<Integer> sortPositions = + getSortPositions(destTable.getSortCols(), destTable.getCols()); + List<Integer> sortOrder = + getSortOrder(destTable.getSortCols(), destTable.getCols()); + List<Integer> sortNullOrder = + getNullSortOrder(destTable.getSortCols(), destTable.getCols()); boolean useBucketSortPositions = true; // Only selects and filters are allowed @@ -464,7 +511,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { } if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols, - sourceTableSortCols, sortOrder)) { + sourceTableSortCols, sortOrder, sortNullOrder)) { return null; } @@ -539,7 +586,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { } for (Partition partition : partitions) { if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder, - numBucketsDestination)) { + sortNullOrder, numBucketsDestination)) { return null; } } @@ -550,7 +597,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { } else { if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder, - numBucketsDestination)) { + sortNullOrder, numBucketsDestination)) { return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index c38c6d7..1e8f30e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -18,18 +18,16 @@ package org.apache.hadoop.hive.ql.optimizer; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED; + import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -60,11 +58,11 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; -import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED; - public class ReduceSinkMapJoinProc implements NodeProcessor { private final static Logger LOG = LoggerFactory.getLogger(ReduceSinkMapJoinProc.class.getName()); @@ -347,11 +345,14 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys(); List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0)); StringBuilder keyOrder = new StringBuilder(); + StringBuilder keyNullOrder = new StringBuilder(); for (ExprNodeDesc k: keyCols) { keyOrder.append("+"); + keyNullOrder.append("a"); } TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString()); + .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString(), + keyNullOrder.toString()); mapJoinOp.getConf().setKeyTableDesc(keyTableDesc); // let the dummy op be the parent of mapjoin op http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index ad12091..3e6c7c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -182,10 +183,9 @@ public class SortedDynPartitionOptimizer extends Transform { // Get the positions for partition, bucket and sort columns List<Integer> bucketPositions = getBucketPositions(destTable.getBucketCols(), destTable.getCols()); - ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = getSortPositionsOrder( - destTable.getSortCols(), destTable.getCols()); List<Integer> sortPositions = null; List<Integer> sortOrder = null; + List<Integer> sortNullOrder = null; if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE || fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) { // When doing updates and deletes we always want to sort on the rowid because the ACID @@ -193,13 +193,16 @@ public class SortedDynPartitionOptimizer extends Transform { // ignore whatever comes from the table and enforce this sort order instead. sortPositions = Arrays.asList(0); sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if + sortNullOrder = Arrays.asList(0); } else { - sortPositions = sortOrderPositions.getFirst(); - sortOrder = sortOrderPositions.getSecond(); + sortPositions = getSortPositions(destTable.getSortCols(), destTable.getCols()); + sortOrder = getSortOrders(destTable.getSortCols(), destTable.getCols()); + sortNullOrder = getSortNullOrders(destTable.getSortCols(), destTable.getCols()); } LOG.debug("Got sort order"); for (int i : sortPositions) LOG.debug("sort position " + i); for (int i : sortOrder) LOG.debug("sort order " + i); + for (int i : sortNullOrder) LOG.debug("sort null order " + i); List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema()); List<ColumnInfo> colInfos = fsParent.getSchema().getSignature(); ArrayList<ExprNodeDesc> bucketColumns = getPositionsToExprNodes(bucketPositions, colInfos); @@ -214,8 +217,9 @@ public class SortedDynPartitionOptimizer extends Transform { for (ColumnInfo ci : parentCols) { allRSCols.add(new ExprNodeColumnDesc(ci)); } + // Create ReduceSink operator - ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, + ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder, allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size()); @@ -408,17 +412,19 @@ public class SortedDynPartitionOptimizer extends Transform { } public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, - List<Integer> sortPositions, List<Integer> sortOrder, ArrayList<ExprNodeDesc> allCols, - ArrayList<ExprNodeDesc> bucketColumns, int numBuckets, + List<Integer> sortPositions, List<Integer> sortOrder, List<Integer> sortNullOrder, + ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns, int numBuckets, Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) { // Order of KEY columns // 1) Partition columns // 2) Bucket number column // 3) Sort columns + // 4) Null sort columns Set<Integer> keyColsPosInVal = Sets.newLinkedHashSet(); ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList(); List<Integer> newSortOrder = Lists.newArrayList(); + List<Integer> newSortNullOrder = Lists.newArrayList(); int numPartAndBuck = partitionPositions.size(); keyColsPosInVal.addAll(partitionPositions); @@ -449,8 +455,33 @@ public class SortedDynPartitionOptimizer extends Transform { } } + // if partition and bucket columns are sorted in ascending order, by default + // nulls come first; otherwise nulls come last + Integer nullOrder = order == 1 ? 0 : 1; + if (sortNullOrder != null && !sortNullOrder.isEmpty()) { + if (sortNullOrder.get(0).intValue() == 0) { + nullOrder = 0; + } else { + nullOrder = 1; + } + } + for (int i = 0; i < numPartAndBuck; i++) { + newSortNullOrder.add(nullOrder); + } + newSortNullOrder.addAll(sortNullOrder); + + String nullOrderStr = ""; + for (Integer i : newSortNullOrder) { + if(i.intValue() == 0) { + nullOrderStr += "a"; + } else { + nullOrderStr += "z"; + } + } + Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap(); ArrayList<ExprNodeDesc> partCols = Lists.newArrayList(); + // we will clone here as RS will update bucket column key with its // corresponding with bucket number and hence their OIs for (Integer idx : keyColsPosInVal) { @@ -480,9 +511,11 @@ public class SortedDynPartitionOptimizer extends Transform { ReduceSinkOperator.class); if (parentRSOp != null && parseCtx.getQueryProperties().hasOuterOrderBy()) { String parentRSOpOrder = parentRSOp.getConf().getOrder(); + String parentRSOpNullOrder = parentRSOp.getConf().getNullOrder(); if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) { keyCols.addAll(parentRSOp.getConf().getKeyCols()); orderStr += parentRSOpOrder; + nullOrderStr += parentRSOpNullOrder; } } @@ -504,7 +537,7 @@ public class SortedDynPartitionOptimizer extends Transform { // from Key and Value TableDesc List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(keyCols, keyColNames, 0, ""); - TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr); + TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr, nullOrderStr); List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(valCols, valColNames, 0, ""); TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields); @@ -523,27 +556,65 @@ public class SortedDynPartitionOptimizer extends Transform { } /** - * Get the sort positions and sort order for the sort columns + * Get the sort positions for the sort columns * @param tabSortCols * @param tabCols * @return */ - private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(List<Order> tabSortCols, + private List<Integer> getSortPositions(List<Order> tabSortCols, List<FieldSchema> tabCols) { List<Integer> sortPositions = Lists.newArrayList(); - List<Integer> sortOrders = Lists.newArrayList(); for (Order sortCol : tabSortCols) { int pos = 0; for (FieldSchema tabCol : tabCols) { if (sortCol.getCol().equals(tabCol.getName())) { sortPositions.add(pos); - sortOrders.add(sortCol.getOrder()); break; } pos++; } } - return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders); + return sortPositions; + } + + /** + * Get the sort order for the sort columns + * @param tabSortCols + * @param tabCols + * @return + */ + private List<Integer> getSortOrders(List<Order> tabSortCols, + List<FieldSchema> tabCols) { + List<Integer> sortOrders = Lists.newArrayList(); + for (Order sortCol : tabSortCols) { + for (FieldSchema tabCol : tabCols) { + if (sortCol.getCol().equals(tabCol.getName())) { + sortOrders.add(sortCol.getOrder()); + break; + } + } + } + return sortOrders; + } + + /** + * Get the null sort order for the sort columns + * @param tabSortCols + * @param tabCols + * @return + */ + private List<Integer> getSortNullOrders(List<Order> tabSortCols, + List<FieldSchema> tabCols) { + List<Integer> sortNullOrders = Lists.newArrayList(); + for (Order sortCol : tabSortCols) { + for (FieldSchema tabCol : tabCols) { + if (sortCol.getCol().equals(tabCol.getName())) { + sortNullOrders.add(sortCol.getNullOrder()); + break; + } + } + } + return sortNullOrders; } private ArrayList<ExprNodeDesc> getPositionsToExprNodes(List<Integer> pos, http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 02db680..a95da0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -33,6 +33,7 @@ import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; @@ -165,7 +166,14 @@ public class RelOptHiveTable extends RelOptAbstractTable { else { direction = Direction.DESCENDING; } - collationList.add(new RelFieldCollation(i,direction)); + NullDirection nullDirection; + if (sortColumn.getNullOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_NULLS_FIRST) { + nullDirection = NullDirection.FIRST; + } + else { + nullDirection = NullDirection.LAST; + } + collationList.add(new RelFieldCollation(i,direction,nullDirection)); break; } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java index 997b82c..03002cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java @@ -287,8 +287,8 @@ public class HiveRelFieldTrimmer extends RelFieldTrimmer { final List<RexNode> originalExtraNodes = ImmutableList.copyOf(extraNodes); for (RexNode node : nodes) { fieldCollations.add( - collation(node, RelFieldCollation.Direction.ASCENDING, null, - extraNodes)); + collation(node, RelFieldCollation.Direction.ASCENDING, + RelFieldCollation.NullDirection.FIRST, extraNodes)); } final RexNode offsetNode = offset <= 0 ? null : relBuilder.literal(offset); final RexNode fetchNode = fetch < 0 ? null : relBuilder.literal(fetch); http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 3f2267d..de7e2f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -52,8 +52,6 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableBitSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; @@ -64,6 +62,8 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConvert import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; @@ -226,6 +226,24 @@ public class ASTConverter { ASTNode directionAST = c.getDirection() == RelFieldCollation.Direction.ASCENDING ? ASTBuilder .createAST(HiveParser.TOK_TABSORTCOLNAMEASC, "TOK_TABSORTCOLNAMEASC") : ASTBuilder .createAST(HiveParser.TOK_TABSORTCOLNAMEDESC, "TOK_TABSORTCOLNAMEDESC"); + ASTNode nullDirectionAST; + // Null direction + if (c.nullDirection == RelFieldCollation.NullDirection.FIRST) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_FIRST, "TOK_NULLS_FIRST"); + directionAST.addChild(nullDirectionAST); + } else if (c.nullDirection == RelFieldCollation.NullDirection.LAST) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_LAST, "TOK_NULLS_LAST"); + directionAST.addChild(nullDirectionAST); + } else { + // Default + if (c.getDirection() == RelFieldCollation.Direction.ASCENDING) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_FIRST, "TOK_NULLS_FIRST"); + directionAST.addChild(nullDirectionAST); + } else { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_LAST, "TOK_NULLS_LAST"); + directionAST.addChild(nullDirectionAST); + } + } // 3 Convert OB expr (OB Expr is usually an input ref except for top // level OB; top level OB will have RexCall kept in a map.) @@ -245,7 +263,7 @@ public class ASTConverter { } // 4 buildup the ob expr AST - directionAST.addChild(astCol); + nullDirectionAST.addChild(astCol); orderAst.addChild(directionAST); } hiveAST.order = orderAst; @@ -430,12 +448,31 @@ public class ASTConverter { if (window.orderKeys != null && !window.orderKeys.isEmpty()) { oByAst = ASTBuilder.createAST(HiveParser.TOK_ORDERBY, "TOK_ORDERBY"); for (RexFieldCollation ok : window.orderKeys) { - ASTNode astNode = ok.getDirection() == RelFieldCollation.Direction.ASCENDING ? ASTBuilder + ASTNode directionAST = ok.getDirection() == RelFieldCollation.Direction.ASCENDING ? ASTBuilder .createAST(HiveParser.TOK_TABSORTCOLNAMEASC, "TOK_TABSORTCOLNAMEASC") : ASTBuilder .createAST(HiveParser.TOK_TABSORTCOLNAMEDESC, "TOK_TABSORTCOLNAMEDESC"); + ASTNode nullDirectionAST; + // Null direction + if (ok.right.contains(SqlKind.NULLS_FIRST)) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_FIRST, "TOK_NULLS_FIRST"); + directionAST.addChild(nullDirectionAST); + } else if (ok.right.contains(SqlKind.NULLS_LAST)) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_LAST, "TOK_NULLS_LAST"); + directionAST.addChild(nullDirectionAST); + } else { + // Default + if (ok.getDirection() == RelFieldCollation.Direction.ASCENDING) { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_FIRST, "TOK_NULLS_FIRST"); + directionAST.addChild(nullDirectionAST); + } else { + nullDirectionAST = ASTBuilder.createAST(HiveParser.TOK_NULLS_LAST, "TOK_NULLS_LAST"); + directionAST.addChild(nullDirectionAST); + } + } ASTNode astCol = ok.left.accept(this); - astNode.addChild(astCol); - oByAst.addChild(astNode); + + nullDirectionAST.addChild(astCol); + oByAst.addChild(directionAST); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java index 739faa9..e51b6c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.RexVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.Schema; import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.NullOrder; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec; @@ -315,7 +316,18 @@ public class ExprNodeConverter extends RexVisitorImpl<ExprNodeDesc> { OrderExpression exprSpec = new OrderExpression(); Order order = ok.getDirection() == RelFieldCollation.Direction.ASCENDING ? Order.ASC : Order.DESC; + NullOrder nullOrder; + if ( ok.right.contains(SqlKind.NULLS_FIRST) ) { + nullOrder = NullOrder.NULLS_FIRST; + } else if ( ok.right.contains(SqlKind.NULLS_LAST) ) { + nullOrder = NullOrder.NULLS_LAST; + } else { + // Default + nullOrder = ok.getDirection() == RelFieldCollation.Direction.ASCENDING ? + NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; + } exprSpec.setOrder(order); + exprSpec.setNullOrder(nullOrder); ASTNode astNode = ok.left.accept(new RexVisitor(schema)); exprSpec.setExpression(astNode); oSpec.addExpression(exprSpec); http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index b841315..1307808 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -41,8 +41,6 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -65,8 +63,8 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.parse.JoinCond; @@ -99,6 +97,8 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -435,6 +435,7 @@ public class HiveOpConverter { Map<Integer, RexNode> obRefToCallMap = sortRel.getInputRefToCallMap(); List<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>(); StringBuilder order = new StringBuilder(); + StringBuilder nullOrder = new StringBuilder(); for (RelFieldCollation sortInfo : sortRel.getCollation().getFieldCollations()) { int sortColumnPos = sortInfo.getFieldIndex(); ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() @@ -447,6 +448,14 @@ public class HiveOpConverter { } else { order.append("+"); } + if (sortInfo.nullDirection == RelFieldCollation.NullDirection.FIRST) { + nullOrder.append("a"); + } else if (sortInfo.nullDirection == RelFieldCollation.NullDirection.LAST) { + nullOrder.append("z"); + } else { + // Default + nullOrder.append(sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING ? "z" : "a"); + } if (obRefToCallMap != null) { RexNode obExpr = obRefToCallMap.get(sortColumnPos); @@ -474,7 +483,7 @@ public class HiveOpConverter { // 1.b. Generate reduce sink and project operator resultOp = genReduceSinkAndBacktrackSelect(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]), 0, new ArrayList<ExprNodeDesc>(), - order.toString(), numReducers, Operation.NOT_ACID, hiveConf, keepColumns); + order.toString(), nullOrder.toString(), numReducers, Operation.NOT_ACID, hiveConf, keepColumns); } // 2. If we need to generate limit @@ -626,6 +635,7 @@ public class HiveOpConverter { ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>(); ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>(); StringBuilder order = new StringBuilder(); + StringBuilder nullOrder = new StringBuilder(); for (PartitionExpression partCol : wSpec.getQueryPartitionSpec().getExpressions()) { ExprNodeDesc partExpr = semanticAnalyzer.genExprNodeDesc(partCol.getExpression(), rr); @@ -633,6 +643,7 @@ public class HiveOpConverter { keyCols.add(partExpr); partCols.add(partExpr); order.append('+'); + nullOrder.append('a'); } } @@ -640,19 +651,22 @@ public class HiveOpConverter { for (OrderExpression orderCol : wSpec.getQueryOrderSpec().getExpressions()) { ExprNodeDesc orderExpr = semanticAnalyzer.genExprNodeDesc(orderCol.getExpression(), rr); char orderChar = orderCol.getOrder() == PTFInvocationSpec.Order.ASC ? '+' : '-'; + char nullOrderChar = orderCol.getNullOrder() == PTFInvocationSpec.NullOrder.NULLS_FIRST ? 'a' : 'z'; int index = ExprNodeDescUtils.indexOf(orderExpr, keyCols); if (index >= 0) { order.setCharAt(index, orderChar); + nullOrder.setCharAt(index, nullOrderChar); continue; } keyCols.add(orderExpr); order.append(orderChar); + nullOrder.append(nullOrderChar); } } SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input, keyCols.toArray(new ExprNodeDesc[keyCols.size()]), 0, partCols, - order.toString(), -1, Operation.NOT_ACID, hiveConf); + order.toString(), nullOrder.toString(), -1, Operation.NOT_ACID, hiveConf); // 2. Finally create PTF PTFTranslator translator = new PTFTranslator(); @@ -677,14 +691,14 @@ public class HiveOpConverter { private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input, ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order, - int numReducers, Operation acidOperation, HiveConf hiveConf) + String nullOrder, int numReducers, Operation acidOperation, HiveConf hiveConf) throws SemanticException { - return genReduceSinkAndBacktrackSelect(input, keys, tag, partitionCols, order, + return genReduceSinkAndBacktrackSelect(input, keys, tag, partitionCols, order, nullOrder, numReducers, acidOperation, hiveConf, input.getSchema().getColumnNames()); } private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input, - ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order, + ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder, int numReducers, Operation acidOperation, HiveConf hiveConf, List<String> keepColNames) throws SemanticException { // 1. Generate RS operator @@ -715,7 +729,8 @@ public class HiveOpConverter { "In CBO return path, genReduceSinkAndBacktrackSelect is expecting only one tableAlias but there is none"); } // 1.2 Now generate RS operator - ReduceSinkOperator rsOp = genReduceSink(input, tableAlias, keys, tag, partitionCols, order, numReducers, acidOperation, hiveConf); + ReduceSinkOperator rsOp = genReduceSink(input, tableAlias, keys, tag, partitionCols, order, + nullOrder, numReducers, acidOperation, hiveConf); // 2. Generate backtrack Select operator Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(keepColNames, @@ -737,13 +752,13 @@ public class HiveOpConverter { private static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag, int numReducers, Operation acidOperation, HiveConf hiveConf) throws SemanticException { - return genReduceSink(input, tableAlias, keys, tag, new ArrayList<ExprNodeDesc>(), "", numReducers, + return genReduceSink(input, tableAlias, keys, tag, new ArrayList<ExprNodeDesc>(), "", "", numReducers, acidOperation, hiveConf); } @SuppressWarnings({ "rawtypes", "unchecked" }) private static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag, - ArrayList<ExprNodeDesc> partitionCols, String order, int numReducers, + ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder, int numReducers, Operation acidOperation, HiveConf hiveConf) throws SemanticException { Operator dummy = Operator.createDummy(); // dummy for backtracking dummy.setParentOperators(Arrays.asList(input)); @@ -818,7 +833,7 @@ public class HiveOpConverter { reduceKeys.size(), numReducers, acidOperation); } else { rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag, - partitionCols, order, numReducers, acidOperation); + partitionCols, order, nullOrder, numReducers, acidOperation); } ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 638b91e..59c87a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -201,7 +201,8 @@ public class ReduceSinkDeDuplication extends Transform { return false; } - Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRSNc.getOrder()); + Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRSNc.getOrder(), + cRSc.getNullOrder(), pRSNc.getNullOrder()); if (moveRSOrderTo == null) { return false; } @@ -298,6 +299,7 @@ public class ReduceSinkDeDuplication extends Transform { "Try set " + HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION + "=false;"); } pRS.getConf().setOrder(cRS.getConf().getOrder()); + pRS.getConf().setNullOrder(cRS.getConf().getNullOrder()); } if (result[3] > 0) { @@ -313,7 +315,8 @@ public class ReduceSinkDeDuplication extends Transform { pRS.getConf().setNumDistributionKeys(cRS.getConf().getNumDistributionKeys()); List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(pRS.getConf() .getKeyCols(), "reducesinkkey"); - TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, pRS.getConf().getOrder()); + TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, pRS.getConf().getOrder(), + pRS.getConf().getNullOrder()); ArrayList<String> outputKeyCols = Lists.newArrayList(); for (int i = 0; i < fields.size(); i++) { outputKeyCols.add(fields.get(i).getName()); @@ -337,7 +340,8 @@ public class ReduceSinkDeDuplication extends Transform { throws SemanticException { ReduceSinkDesc cConf = cRS.getConf(); ReduceSinkDesc pConf = pRS.getConf(); - Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder()); + Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder(), + cConf.getNullOrder(), pConf.getNullOrder()); if (moveRSOrderTo == null) { return null; } @@ -447,7 +451,10 @@ public class ReduceSinkDeDuplication extends Transform { } // order of overlapping keys should be exactly the same - protected Integer checkOrder(String corder, String porder) { + protected Integer checkOrder(String corder, String porder, + String cNullOrder, String pNullOrder) { + assert corder.length() == cNullOrder.length(); + assert porder.length() == pNullOrder.length(); if (corder == null || corder.trim().equals("")) { if (porder == null || porder.trim().equals("")) { return 0; @@ -459,8 +466,11 @@ public class ReduceSinkDeDuplication extends Transform { } corder = corder.trim(); porder = porder.trim(); + cNullOrder = cNullOrder.trim(); + pNullOrder = pNullOrder.trim(); int target = Math.min(corder.length(), porder.length()); - if (!corder.substring(0, target).equals(porder.substring(0, target))) { + if (!corder.substring(0, target).equals(porder.substring(0, target)) || + !cNullOrder.substring(0, target).equals(pNullOrder.substring(0, target))) { return null; } return Integer.valueOf(corder.length()).compareTo(porder.length()); http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java index 296fecb..ea3e179 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java @@ -201,14 +201,16 @@ public class BucketingSortingCtx implements NodeProcessorCtx { private List<Integer> indexes = new ArrayList<Integer>(); // Sort order (+|-) private char sortOrder; + private char nullSortOrder; - public SortCol(String name, int index, char sortOrder) { - this(sortOrder); + public SortCol(String name, int index, char sortOrder, char nullSortOrder) { + this(sortOrder, nullSortOrder); addAlias(name, index); } - public SortCol(char sortOrder) { + public SortCol(char sortOrder, char nullSortOrder) { this.sortOrder = sortOrder; + this.nullSortOrder = nullSortOrder; } @@ -232,11 +234,16 @@ public class BucketingSortingCtx implements NodeProcessorCtx { return sortOrder; } + public char getNullSortOrder() { + return nullSortOrder; + } + @Override // Chooses a representative alias, index, and order to use as the String, the first is used // because it is set in the constructor public String toString() { - return "name: " + names.get(0) + " index: " + indexes.get(0) + " order: " + sortOrder; + return "name: " + names.get(0) + " index: " + indexes.get(0) + " order: " + sortOrder + + " nullOrder: " + nullSortOrder; } } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java index aa41200..9159120 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java @@ -166,6 +166,7 @@ public class BucketingSortingOpProcFactory { } String sortOrder = rsDesc.getOrder(); + String nullSortOrder = rsDesc.getNullOrder(); List<ExprNodeDesc> keyCols = rsDesc.getKeyCols(); List<ExprNodeDesc> valCols = ExprNodeDescUtils.backtrack(joinValues, jop, parent); @@ -186,7 +187,8 @@ public class BucketingSortingOpProcFactory { newSortCols[keyIndex].addAlias(vname, vindex); } else { newBucketCols[keyIndex] = new BucketCol(vname, vindex); - newSortCols[keyIndex] = new SortCol(vname, vindex, sortOrder.charAt(keyIndex)); + newSortCols[keyIndex] = new SortCol(vname, vindex, sortOrder.charAt(keyIndex), + nullSortOrder.charAt(keyIndex)); } } } @@ -311,7 +313,8 @@ public class BucketingSortingOpProcFactory { int sortIndex = indexOfColName(sortCols, columnExpr.getColumn()); if (sortIndex != -1) { if (newSortCols[sortIndex] == null) { - newSortCols[sortIndex] = new SortCol(sortCols.get(sortIndex).getSortOrder()); + newSortCols[sortIndex] = new SortCol(sortCols.get(sortIndex).getSortOrder(), + sortCols.get(sortIndex).getNullSortOrder()); } newSortCols[sortIndex].addAlias( colInfos.get(colInfosIndex).getInternalName(), colInfosIndex); @@ -436,7 +439,7 @@ public class BucketingSortingOpProcFactory { private static List<SortCol> getNewSortCols(List<SortCol> sortCols, List<ColumnInfo> colInfos) { List<SortCol> newSortCols = new ArrayList<SortCol>(sortCols.size()); for (int i = 0; i < sortCols.size(); i++) { - SortCol sortCol = new SortCol(sortCols.get(i).getSortOrder()); + SortCol sortCol = new SortCol(sortCols.get(i).getSortOrder(), sortCols.get(i).getNullSortOrder()); for (Integer index : sortCols.get(i).getIndexes()) { // The only time this condition should be false is in the case of dynamic partitioning if (index < colInfos.size()) { @@ -537,6 +540,7 @@ public class BucketingSortingOpProcFactory { static List<SortCol> extractSortCols(ReduceSinkOperator rop, List<ExprNodeDesc> outputValues) { String sortOrder = rop.getConf().getOrder(); + String nullSortOrder = rop.getConf().getNullOrder(); List<SortCol> sortCols = new ArrayList<SortCol>(); ArrayList<ExprNodeDesc> keyCols = rop.getConf().getKeyCols(); for (int i = 0; i < keyCols.size(); i++) { @@ -548,7 +552,8 @@ public class BucketingSortingOpProcFactory { if (index < 0) { break; } - sortCols.add(new SortCol(((ExprNodeColumnDesc) keyCol).getColumn(), index, sortOrder.charAt(i))); + sortCols.add(new SortCol(((ExprNodeColumnDesc) keyCol).getColumn(), index, + sortOrder.charAt(i), nullSortOrder.charAt(i))); } // If the sorted columns can't all be found in the values then the data is only sorted on // the columns seen up until now @@ -649,6 +654,7 @@ public class BucketingSortingOpProcFactory { GroupByDesc groupByDesc = gop.getConf(); String sortOrder = rop.getConf().getOrder(); + String nullSortOrder = rop.getConf().getNullOrder(); List<BucketCol> bucketCols = new ArrayList<BucketCol>(); List<SortCol> sortCols = new ArrayList<SortCol>(); assert rop.getConf().getKeyCols().size() <= rop.getSchema().getSignature().size(); @@ -659,7 +665,7 @@ public class BucketingSortingOpProcFactory { } String colName = rop.getSchema().getSignature().get(i).getInternalName(); bucketCols.add(new BucketCol(colName, i)); - sortCols.add(new SortCol(colName, i, sortOrder.charAt(i))); + sortCols.add(new SortCol(colName, i, sortOrder.charAt(i), nullSortOrder.charAt(i))); } bctx.setBucketedCols(rop, bucketCols); bctx.setSortedCols(rop, sortCols); http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java index f48fac1..f296a53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hive.ql.optimizer.spark; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; @@ -52,13 +55,10 @@ import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; +import com.google.common.base.Preconditions; public class SparkReduceSinkMapJoinProc implements NodeProcessor { @@ -209,11 +209,14 @@ public class SparkReduceSinkMapJoinProc implements NodeProcessor { Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys(); List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0)); StringBuilder keyOrder = new StringBuilder(); + StringBuilder keyNullOrder = new StringBuilder(); for (int i = 0; i < keyCols.size(); i++) { keyOrder.append("+"); + keyNullOrder.append("a"); } TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString()); + .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString(), + keyNullOrder.toString()); mapJoinOp.getConf().setKeyTableDesc(keyTableDesc); // let the dummy op be the parent of mapjoin op http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 6523288..f10a40a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -109,6 +109,8 @@ public abstract class BaseSemanticAnalyzer { public static int HIVE_COLUMN_ORDER_ASC = 1; public static int HIVE_COLUMN_ORDER_DESC = 0; + public static int HIVE_COLUMN_NULLS_FIRST = 0; + public static int HIVE_COLUMN_NULLS_LAST = 1; /** * ReadEntities that are passed to the hooks. @@ -657,11 +659,23 @@ public abstract class BaseSemanticAnalyzer { for (int i = 0; i < numCh; i++) { ASTNode child = (ASTNode) ast.getChild(i); if (child.getToken().getType() == HiveParser.TOK_TABSORTCOLNAMEASC) { - colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), - HIVE_COLUMN_ORDER_ASC)); + child = (ASTNode) child.getChild(0); + if (child.getToken().getType() == HiveParser.TOK_NULLS_FIRST) { + colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), + HIVE_COLUMN_ORDER_ASC, HIVE_COLUMN_NULLS_FIRST)); + } else { + colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), + HIVE_COLUMN_ORDER_ASC, HIVE_COLUMN_NULLS_LAST)); + } } else { - colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), - HIVE_COLUMN_ORDER_DESC)); + child = (ASTNode) child.getChild(0); + if (child.getToken().getType() == HiveParser.TOK_NULLS_LAST) { + colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), + HIVE_COLUMN_ORDER_DESC, HIVE_COLUMN_NULLS_LAST)); + } else { + colList.add(new Order(unescapeIdentifier(child.getChild(0).getText()).toLowerCase(), + HIVE_COLUMN_ORDER_DESC, HIVE_COLUMN_NULLS_FIRST)); + } } } return colList; http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d056c5d..c36aa9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2413,6 +2413,7 @@ public class CalcitePlanner extends SemanticAnalyzer { List<Node> obASTExprLst = obAST.getChildren(); ASTNode obASTExpr; + ASTNode nullObASTExpr; List<Pair<ASTNode, TypeInfo>> vcASTTypePairs = new ArrayList<Pair<ASTNode, TypeInfo>>(); RowResolver inputRR = relToHiveRR.get(srcRel); RowResolver outputRR = new RowResolver(); @@ -2425,9 +2426,11 @@ public class CalcitePlanner extends SemanticAnalyzer { for (int i = 0; i < obASTExprLst.size(); i++) { // 2.1 Convert AST Expr to ExprNode obASTExpr = (ASTNode) obASTExprLst.get(i); + nullObASTExpr = (ASTNode) obASTExpr.getChild(0); + ASTNode ref = (ASTNode) nullObASTExpr.getChild(0); Map<ASTNode, ExprNodeDesc> astToExprNDescMap = TypeCheckProcFactory.genExprNode( obASTExpr, new TypeCheckCtx(inputRR)); - ExprNodeDesc obExprNDesc = astToExprNDescMap.get(obASTExpr.getChild(0)); + ExprNodeDesc obExprNDesc = astToExprNDescMap.get(ref); if (obExprNDesc == null) throw new SemanticException("Invalid order by expression: " + obASTExpr.toString()); @@ -2442,18 +2445,26 @@ public class CalcitePlanner extends SemanticAnalyzer { } else { fieldIndex = srcRelRecordSz + newVCLst.size(); newVCLst.add(rnd); - vcASTTypePairs.add(new Pair<ASTNode, TypeInfo>((ASTNode) obASTExpr.getChild(0), - obExprNDesc.getTypeInfo())); + vcASTTypePairs.add(new Pair<ASTNode, TypeInfo>(ref, obExprNDesc.getTypeInfo())); } // 2.4 Determine the Direction of order by - org.apache.calcite.rel.RelFieldCollation.Direction order = RelFieldCollation.Direction.DESCENDING; + RelFieldCollation.Direction order = RelFieldCollation.Direction.DESCENDING; if (obASTExpr.getType() == HiveParser.TOK_TABSORTCOLNAMEASC) { order = RelFieldCollation.Direction.ASCENDING; } + RelFieldCollation.NullDirection nullOrder; + if (nullObASTExpr.getType() == HiveParser.TOK_NULLS_FIRST) { + nullOrder = RelFieldCollation.NullDirection.FIRST; + } else if (nullObASTExpr.getType() == HiveParser.TOK_NULLS_LAST) { + nullOrder = RelFieldCollation.NullDirection.LAST; + } else { + throw new SemanticException( + "Unexpected null ordering option: " + nullObASTExpr.getType()); + } // 2.5 Add to field collations - fieldCollations.add(new RelFieldCollation(fieldIndex, order)); + fieldCollations.add(new RelFieldCollation(fieldIndex, order, nullOrder)); } // 3. Add Child Project Rel if needed, Generate Output RR, input Sel Rel @@ -2583,8 +2594,17 @@ public class CalcitePlanner extends SemanticAnalyzer { ExprNodeDesc exp = genExprNodeDesc(oExpr.getExpression(), inputRR, tcCtx); RexNode ordExp = converter.convert(exp); Set<SqlKind> flags = new HashSet<SqlKind>(); - if (oExpr.getOrder() == org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC) + if (oExpr.getOrder() == org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC) { flags.add(SqlKind.DESCENDING); + } + if (oExpr.getNullOrder() == org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.NullOrder.NULLS_FIRST) { + flags.add(SqlKind.NULLS_FIRST); + } else if (oExpr.getNullOrder() == org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.NullOrder.NULLS_LAST) { + flags.add(SqlKind.NULLS_LAST); + } else { + throw new SemanticException( + "Unexpected null ordering option: " + oExpr.getNullOrder()); + } oKeys.add(new RexFieldCollation(ordExp, flags)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 3f92d16..dd997f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -55,6 +55,8 @@ KW_EXISTS : 'EXISTS'; KW_ASC : 'ASC'; KW_DESC : 'DESC'; +KW_NULLS : 'NULLS'; +KW_LAST : 'LAST'; KW_ORDER : 'ORDER'; KW_GROUP : 'GROUP'; KW_BY : 'BY'; http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 9cca100..50c53db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -81,6 +81,8 @@ TOK_GROUPING_SETS; TOK_GROUPING_SETS_EXPRESSION; TOK_HAVING; TOK_ORDERBY; +TOK_NULLS_FIRST; +TOK_NULLS_LAST; TOK_CLUSTERBY; TOK_DISTRIBUTEBY; TOK_SORTBY; @@ -402,6 +404,8 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_ASC", "ASC"); xlateMap.put("KW_DESC", "DESC"); + xlateMap.put("KW_NULLS", "NULLS"); + xlateMap.put("KW_LAST", "LAST"); xlateMap.put("KW_ORDER", "ORDER"); xlateMap.put("KW_BY", "BY"); xlateMap.put("KW_GROUP", "GROUP"); @@ -2014,13 +2018,34 @@ skewedValueLocationElement skewedColumnValue | skewedColumnValuePair ; - + +orderSpecification +@init { pushMsg("order specification", state); } +@after { popMsg(state); } + : KW_ASC | KW_DESC ; + +nullOrdering +@init { pushMsg("nulls ordering", state); } +@after { popMsg(state); } + : KW_NULLS KW_FIRST -> ^(TOK_NULLS_FIRST) + | KW_NULLS KW_LAST -> ^(TOK_NULLS_LAST) + ; + columnNameOrder @init { pushMsg("column name order", state); } @after { popMsg(state); } - : identifier (asc=KW_ASC | desc=KW_DESC)? - -> {$desc == null}? ^(TOK_TABSORTCOLNAMEASC identifier) - -> ^(TOK_TABSORTCOLNAMEDESC identifier) + : identifier orderSpec=orderSpecification? nullSpec=nullOrdering? + -> {$orderSpec.tree == null && $nullSpec.tree == null}? + ^(TOK_TABSORTCOLNAMEASC ^(TOK_NULLS_FIRST identifier)) + -> {$orderSpec.tree == null}? + ^(TOK_TABSORTCOLNAMEASC ^($nullSpec identifier)) + -> {$nullSpec.tree == null && $orderSpec.tree.getType()==HiveParser.KW_ASC}? + ^(TOK_TABSORTCOLNAMEASC ^(TOK_NULLS_FIRST identifier)) + -> {$nullSpec.tree == null && $orderSpec.tree.getType()==HiveParser.KW_DESC}? + ^(TOK_TABSORTCOLNAMEDESC ^(TOK_NULLS_LAST identifier)) + -> {$orderSpec.tree.getType()==HiveParser.KW_ASC}? + ^(TOK_TABSORTCOLNAMEASC ^($nullSpec identifier)) + -> ^(TOK_TABSORTCOLNAMEDESC ^($nullSpec identifier)) ; columnNameCommentList @@ -2039,9 +2064,18 @@ columnNameComment columnRefOrder @init { pushMsg("column order", state); } @after { popMsg(state); } - : expression (asc=KW_ASC | desc=KW_DESC)? - -> {$desc == null}? ^(TOK_TABSORTCOLNAMEASC expression) - -> ^(TOK_TABSORTCOLNAMEDESC expression) + : expression orderSpec=orderSpecification? nullSpec=nullOrdering? + -> {$orderSpec.tree == null && $nullSpec.tree == null}? + ^(TOK_TABSORTCOLNAMEASC ^(TOK_NULLS_FIRST expression)) + -> {$orderSpec.tree == null}? + ^(TOK_TABSORTCOLNAMEASC ^($nullSpec expression)) + -> {$nullSpec.tree == null && $orderSpec.tree.getType()==HiveParser.KW_ASC}? + ^(TOK_TABSORTCOLNAMEASC ^(TOK_NULLS_FIRST expression)) + -> {$nullSpec.tree == null && $orderSpec.tree.getType()==HiveParser.KW_DESC}? + ^(TOK_TABSORTCOLNAMEDESC ^(TOK_NULLS_LAST expression)) + -> {$orderSpec.tree.getType()==HiveParser.KW_ASC}? + ^(TOK_TABSORTCOLNAMEASC ^($nullSpec expression)) + -> ^(TOK_TABSORTCOLNAMEDESC ^($nullSpec expression)) ; columnNameType http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 61bd10c..a192fa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -653,8 +653,8 @@ nonReserved | KW_ENABLE | KW_ESCAPED | KW_EXCLUSIVE | KW_EXPLAIN | KW_EXPORT | KW_FIELDS | KW_FILE | KW_FILEFORMAT | KW_FIRST | KW_FORMAT | KW_FORMATTED | KW_FUNCTIONS | KW_HOLD_DDLTIME | KW_HOUR | KW_IDXPROPERTIES | KW_IGNORE | KW_INDEX | KW_INDEXES | KW_INPATH | KW_INPUTDRIVER | KW_INPUTFORMAT | KW_ITEMS | KW_JAR - | KW_KEYS | KW_KEY_TYPE | KW_LIMIT | KW_OFFSET | KW_LINES | KW_LOAD | KW_LOCATION | KW_LOCK | KW_LOCKS | KW_LOGICAL | KW_LONG - | KW_MAPJOIN | KW_MATERIALIZED | KW_METADATA | KW_MINUS | KW_MINUTE | KW_MONTH | KW_MSCK | KW_NOSCAN | KW_NO_DROP | KW_OFFLINE + | KW_KEYS | KW_KEY_TYPE | KW_LAST | KW_LIMIT | KW_OFFSET | KW_LINES | KW_LOAD | KW_LOCATION | KW_LOCK | KW_LOCKS | KW_LOGICAL | KW_LONG + | KW_MAPJOIN | KW_MATERIALIZED | KW_METADATA | KW_MINUS | KW_MINUTE | KW_MONTH | KW_MSCK | KW_NOSCAN | KW_NO_DROP | KW_NULLS | KW_OFFLINE | KW_OPTION | KW_OUTPUTDRIVER | KW_OUTPUTFORMAT | KW_OVERWRITE | KW_OWNER | KW_PARTITIONED | KW_PARTITIONS | KW_PLUS | KW_PRETTY | KW_PRINCIPALS | KW_PROTECTION | KW_PURGE | KW_READ | KW_READONLY | KW_REBUILD | KW_RECORDREADER | KW_RECORDWRITER | KW_RELOAD | KW_RENAME | KW_REPAIR | KW_REPLACE | KW_REPLICATION | KW_RESTRICT | KW_REWRITE http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java index a8980eb..ecf3cfc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java @@ -508,18 +508,27 @@ public class PTFInvocationSpec { DESC; } + public static enum NullOrder + { + NULLS_FIRST, + NULLS_LAST; + } + public static class OrderExpression extends PartitionExpression { Order order; + NullOrder nullOrder; public OrderExpression() { order = Order.ASC; + nullOrder = NullOrder.NULLS_FIRST; } public OrderExpression(PartitionExpression peSpec) { super(peSpec); order = Order.ASC; + nullOrder = NullOrder.NULLS_FIRST; } public Order getOrder() @@ -532,12 +541,23 @@ public class PTFInvocationSpec { this.order = order; } + public NullOrder getNullOrder() + { + return nullOrder; + } + + public void setNullOrder(NullOrder nullOrder) + { + this.nullOrder = nullOrder; + } + @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); result = prime * result + ((order == null) ? 0 : order.hashCode()); + result = prime * result + ((nullOrder == null) ? 0 : nullOrder.hashCode()); return result; } @@ -557,13 +577,16 @@ public class PTFInvocationSpec { if (order != other.order) { return false; } + if (nullOrder != other.nullOrder) { + return false; + } return true; } @Override public String toString() { - return String.format("%s %s", super.toString(), order); + return String.format("%s %s %s", super.toString(), order, nullOrder); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9350b693/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index 2370ec0..9921b21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -499,6 +499,7 @@ public class PTFTranslator { throws SemanticException { OrderExpressionDef oexpDef = new OrderExpressionDef(); oexpDef.setOrder(oExpr.getOrder()); + oexpDef.setNullOrder(oExpr.getNullOrder()); try { PTFExpressionDef expDef = buildExpressionDef(inpShape, oExpr.getExpression()); oexpDef.setExpressionTreeString(expDef.getExpressionTreeString());
