This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1374460ae65 HIVE-29458: Iceberg: Sort expressions should not be added
for distribution. Partition transforms are added for clustering (distribution
and sorting) (#6325)
1374460ae65 is described below
commit 1374460ae65994c9da5ba8341fb80ec8f7a24e44
Author: kokila-19 <[email protected]>
AuthorDate: Wed Mar 11 02:20:34 2026 +0530
HIVE-29458: Iceberg: Sort expressions should not be added for distribution.
Partition transforms are added for clustering (distribution and sorting) (#6325)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 58 +++--
.../mr/hive/IcebergTransformSortFunctionUtil.java | 4 +-
.../iceberg_create_locally_zordered_table.q | 22 ++
.../llap/iceberg_alter_locally_ordered_table.q.out | 1 -
.../iceberg_alter_locally_zordered_table.q.out | 1 -
.../iceberg_create_locally_zordered_table.q.out | 241 ++++++++++++++++++++-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 +-
.../ql/optimizer/SortedDynPartitionOptimizer.java | 133 +++++++-----
.../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 34 ++-
9 files changed, 419 insertions(+), 77 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7bf2aef49a7..0643c26834b 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -890,9 +890,9 @@ public Map<Integer, List<TransformSpec>>
getPartitionTransformSpecs(
Pair::first, Collectors.mapping(Pair::second, Collectors.toList())));
}
- private List<TransformSpec> getSortTransformSpec(Table table) {
- return table.sortOrder().fields().stream().map(s ->
- IcebergTableUtil.getTransformSpec(table, s.transform().toString(),
s.sourceId()))
+ private List<TransformSpec> getWriteSortTransformSpecs(Table table) {
+ return table.sortOrder().fields().stream()
+ .map(s -> IcebergTableUtil.getTransformSpec(table,
s.transform().toString(), s.sourceId()))
.toList();
}
@@ -913,11 +913,16 @@ public DynamicPartitionCtx createDPContext(
hiveConf.getVar(ConfVars.DEFAULT_PARTITION_NAME),
hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
+ // Add Iceberg partition transforms as custom partition expressions.
+ // These are required for clustering by partition spec/values for
clustered writers.
if (table.spec().isPartitioned() &&
-
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
- addCustomSortExpr(table, hmsTable, writeOperation, dpCtx,
getPartitionTransformSpec(hmsTable));
+ hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD)
>= 0) {
+ addCustomPartitionTransformExpressions(table, hmsTable, writeOperation,
dpCtx,
+ getPartitionTransformSpec(hmsTable));
}
+ // Add write sort order expressions as custom sort expressions.
+ // These are used ONLY for sorting within reducers, NOT for distribution.
SortOrder sortOrder = table.sortOrder();
if (sortOrder.isSorted()) {
List<Integer> customSortPositions = Lists.newLinkedList();
@@ -943,7 +948,8 @@ public DynamicPartitionCtx createDPContext(
}
}
- addCustomSortExpr(table, hmsTable, writeOperation, dpCtx,
getSortTransformSpec(table));
+ addCustomWriteSortExpressions(table, hmsTable, writeOperation, dpCtx,
+ getWriteSortTransformSpecs(table));
}
// Even if table has no explicit sort order, honor z-order if configured
@@ -999,21 +1005,43 @@ private void addZOrderCustomExpr(Map<String, String>
props, DynamicPartitionCtx
}
}
- private void addCustomSortExpr(Table table,
org.apache.hadoop.hive.ql.metadata.Table hmsTable,
- Operation writeOperation, DynamicPartitionCtx dpCtx,
- List<TransformSpec> transformSpecs) {
+ private void addCustomPartitionTransformExpressions(Table table,
+ org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation
writeOperation,
+ DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
+ Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
+ int offset = getWriteRowOffset(hmsTable, writeOperation);
+
+ dpCtx.addCustomPartitionExpressions(transformSpecs.stream()
+ .map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
+ spec, fieldOrderMap.get(spec.getColumnName()) + offset))
+ .toList());
+ }
+
+ private void addCustomWriteSortExpressions(Table table,
+ org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation
writeOperation,
+ DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
+ Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
+ int offset = getWriteRowOffset(hmsTable, writeOperation);
+
+ dpCtx.addCustomSortExpressions(transformSpecs.stream()
+ .map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
+ spec, fieldOrderMap.get(spec.getColumnName()) + offset))
+ .toList());
+ }
+
+ private Map<String, Integer> buildFieldOrderMap(Table table) {
List<Types.NestedField> fields = table.schema().columns();
Map<String, Integer> fieldOrderMap =
Maps.newHashMapWithExpectedSize(fields.size());
for (int i = 0; i < fields.size(); ++i) {
fieldOrderMap.put(fields.get(i).name(), i);
}
+ return fieldOrderMap;
+ }
- int offset = (shouldOverwrite(hmsTable, writeOperation) ?
- ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable,
writeOperation)).size();
-
- dpCtx.addCustomSortExpressions(transformSpecs.stream().map(spec ->
- IcebergTransformSortFunctionUtil.getCustomSortExprs(spec,
fieldOrderMap.get(spec.getColumnName()) + offset)
- ).collect(Collectors.toList()));
+ private int getWriteRowOffset(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, Operation writeOperation) {
+ return (shouldOverwrite(hmsTable, writeOperation) ?
+ ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA :
+ acidSelectColumns(hmsTable, writeOperation)).size();
}
@Override
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
index f2bc36abca4..b0ceba4d2d5 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
@@ -37,7 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/**
- * A utility class which provides Iceberg transform sort functions.
+ * Utility for building Iceberg transform expressions used by both
partitioning and sorting.
*/
public final class IcebergTransformSortFunctionUtil {
@@ -136,7 +136,7 @@ private IcebergTransformSortFunctionUtil() {
}
};
- public static Function<List<ExprNodeDesc>, ExprNodeDesc>
getCustomSortExprs(TransformSpec spec, int index) {
+ public static Function<List<ExprNodeDesc>, ExprNodeDesc>
getCustomTransformExpr(TransformSpec spec, int index) {
switch (spec.getTransformType()) {
case BUCKET:
return BUCKET_SORT_EXPR.apply(index, spec.getTransformParam());
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
index df79ad94179..fbdd66470bb 100644
---
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
+++
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
@@ -109,3 +109,25 @@ INSERT INTO default.zorder_props VALUES (3, 'B'),(1,
'A'),(7, 'C'),(2, 'A'),(9,
DESCRIBE FORMATTED default.zorder_props;
SELECT * FROM default.zorder_props;
DROP TABLE default.zorder_props;
+
+-- Validates partition transform (bucket) + z-order sort together
+CREATE TABLE default.zorder_tsdl_test (
+ ts timestamp,
+ dd double,
+ ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc;
+
+DESCRIBE FORMATTED default.zorder_tsdl_test;
+EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES (TIMESTAMP '2022-01-01
00:00:00', 0.0, 0);
+
+INSERT INTO default.zorder_tsdl_test VALUES
+ (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+ (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+ (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2);
+
+SELECT * FROM default.zorder_tsdl_test;
+DROP TABLE default.zorder_tsdl_test;
+
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
index e42a27153cb..74a2945b82d 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
@@ -158,7 +158,6 @@ STAGE PLANS:
key expressions: _col0 (type: int), _col1 (type:
string)
null sort order: az
sort order: -+
- Map-reduce partition columns: _col0 (type: int),
_col1 (type: string)
Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type:
string), _col2 (type: int), _col3 (type: string)
Select Operator
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
index 83947b35e9a..53293d3798c 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
@@ -264,7 +264,6 @@ STAGE PLANS:
key expressions: iceberg_zorder(_col0, _col2) (type:
binary)
null sort order: z
sort order: +
- Map-reduce partition columns: iceberg_zorder(_col0,
_col2) (type: binary)
Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type:
string), _col2 (type: int), _col3 (type: string)
Select Operator
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
index d5ab8aafda8..42f0631140f 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
@@ -109,7 +109,6 @@ STAGE PLANS:
key expressions: iceberg_zorder(_col0, _col1) (type:
binary)
null sort order: z
sort order: +
- Map-reduce partition columns: iceberg_zorder(_col0,
_col1) (type: binary)
Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type:
string)
Select Operator
@@ -610,3 +609,243 @@ POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@zorder_props
POSTHOOK: Output: database:default
POSTHOOK: Output: default@zorder_props
+PREHOOK: query: CREATE TABLE default.zorder_tsdl_test (
+ ts timestamp,
+ dd double,
+ ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: CREATE TABLE default.zorder_tsdl_test (
+ ts timestamp,
+ dd double,
+ ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@zorder_tsdl_test
+PREHOOK: query: DESCRIBE FORMATTED default.zorder_tsdl_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@zorder_tsdl_test
+POSTHOOK: query: DESCRIBE FORMATTED default.zorder_tsdl_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@zorder_tsdl_test
+# col_name data_type comment
+ts timestamp
+dd double
+ll int
+
+# Partition Information
+# col_name data_type comment
+ll int Transform: bucket[4]
+
+# Partition Transform Information
+# col_name transform_type
+ll BUCKET[4]
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Retention: 0
+#### A masked pattern was here ####
+Table Type: EXTERNAL_TABLE
+Table Parameters:
+ COLUMN_STATS_ACCURATE
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dd\":\"true\",\"ll\":\"true\",\"ts\":\"true\"}}
+ EXTERNAL TRUE
+ bucketing_version 2
+ current-schema
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":2,\"name\":\"dd\",\"required\":false,\"type\":\"double\"},{\"id\":3,\"name\":\"ll\",\"required\":false,\"type\":\"int\"}]}
+ default-partition-spec
{\"spec-id\":0,\"fields\":[{\"name\":\"ll_bucket\",\"transform\":\"bucket[4]\",\"source-id\":3,\"field-id\":1000}]}
+ format-version 2
+ iceberg.orc.files.only true
+#### A masked pattern was here ####
+ numFiles 0
+ numRows 0
+ parquet.compression zstd
+ rawDataSize 0
+ serialization.format 1
+ snapshot-count 0
+ sort.columns ts,dd
+ sort.order ZORDER
+ storage_handler
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ table_type ICEBERG
+ totalSize #Masked#
+#### A masked pattern was here ####
+ uuid #Masked#
+ write.delete.mode merge-on-read
+ write.format.default orc
+ write.merge.mode merge-on-read
+ write.metadata.delete-after-commit.enabled true
+ write.update.mode merge-on-read
+
+# Storage Information
+SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+Compressed: No
+Sort Columns: []
+PREHOOK: query: EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES (TIMESTAMP
'2022-01-01 00:00:00', 0.0, 0)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES
(TIMESTAMP '2022-01-01 00:00:00', 0.0, 0)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@zorder_tsdl_test
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 3 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: _dummy_table
+ Row Limit Per Split: 1
+ Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE
Column stats: COMPLETE
+ Select Operator
+ expressions: array(const struct(TIMESTAMP'2022-01-01
00:00:00',0,0)) (type: array<struct<col1:timestamp,col2:decimal(1,0),col3:int>>)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 48 Basic stats:
COMPLETE Column stats: COMPLETE
+ UDTF Operator
+ Statistics: Num rows: 1 Data size: 48 Basic stats:
COMPLETE Column stats: COMPLETE
+ function name: inline
+ Select Operator
+ expressions: col1 (type: timestamp), UDFToDouble(col2)
(type: double), col3 (type: int)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: iceberg_bucket(_col2, 4) (type:
int), iceberg_zorder(_col0, _col1) (type: binary)
+ null sort order: zz
+ sort order: ++
+ Map-reduce partition columns: iceberg_bucket(_col2,
4) (type: int)
+ Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: timestamp), _col1
(type: double), _col2 (type: int)
+ Select Operator
+ expressions: _col0 (type: timestamp), _col1 (type:
double), _col2 (type: int)
+ outputColumnNames: ts, dd, ll
+ Statistics: Num rows: 1 Data size: 8 Basic stats:
COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: min(ts), max(ts), count(1),
count(ts), compute_bit_vector_hll(ts), min(dd), max(dd), count(dd),
compute_bit_vector_hll(dd), min(ll), max(ll), count(ll),
compute_bit_vector_hll(ll)
+ keys: iceberg_bucket(ll, 4) (type: int)
+ minReductionHashAggr: 0.4
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2, _col3,
_col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13
+ Statistics: Num rows: 1 Data size: 572 Basic
stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 1 Data size: 572 Basic
stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: timestamp),
_col2 (type: timestamp), _col3 (type: bigint), _col4 (type: bigint), _col5
(type: binary), _col6 (type: double), _col7 (type: double), _col8 (type:
bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12
(type: bigint), _col13 (type: binary)
+ Execution mode: llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: VALUE._col0 (type: timestamp), VALUE._col1 (type:
double), VALUE._col2 (type: int), KEY.iceberg_bucket(_col2, 4) (type: int),
KEY.iceberg_zorder(_col0, _col1) (type: binary)
+ outputColumnNames: _col0, _col1, _col2, iceberg_bucket(_col2,
4), iceberg_zorder(_col0, _col1)
+ File Output Operator
+ compressed: false
+ Dp Sort State: PARTITION_SORTED
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE
+ table:
+ input format:
org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format:
org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.zorder_tsdl_test
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: min(VALUE._col0), max(VALUE._col1),
count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4),
min(VALUE._col5), max(VALUE._col6), count(VALUE._col7),
compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10),
count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+ keys: KEY._col0 (type: int)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13
+ Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE
Column stats: COMPLETE
+ Select Operator
+ expressions: 'TIMESTAMP' (type: string), _col1 (type:
timestamp), _col2 (type: timestamp), (_col3 - _col4) (type: bigint),
COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary),
'DOUBLE' (type: string), _col6 (type: double), _col7 (type: double), (_col3 -
_col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type:
bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type:
bigint), UDFToLong(_col11) (type: bigint), [...]
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15,
_col16, _col17, _col18
+ Statistics: Num rows: 1 Data size: 907 Basic stats: COMPLETE
Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 907 Basic stats:
COMPLETE Column stats: COMPLETE
+ table:
+ input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: false
+ table:
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.zorder_tsdl_test
+
+ Stage: Stage-3
+ Stats Work
+ Basic Stats Work:
+ Column Stats Desc:
+ Columns: ts, dd, ll
+ Column Types: timestamp, double, int
+ Table: default.zorder_tsdl_test
+
+PREHOOK: query: INSERT INTO default.zorder_tsdl_test VALUES
+ (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+ (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+ (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: INSERT INTO default.zorder_tsdl_test VALUES
+ (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+ (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+ (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@zorder_tsdl_test
+PREHOOK: query: SELECT * FROM default.zorder_tsdl_test
+PREHOOK: type: QUERY
+PREHOOK: Input: default@zorder_tsdl_test
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.zorder_tsdl_test
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@zorder_tsdl_test
+#### A masked pattern was here ####
+2022-01-01 00:00:00 0.0 0
+2026-06-15 12:00:00 5000.5 2
+2030-12-31 23:59:59 9999.99 1
+PREHOOK: query: DROP TABLE default.zorder_tsdl_test
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@zorder_tsdl_test
+PREHOOK: Output: database:default
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: DROP TABLE default.zorder_tsdl_test
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@zorder_tsdl_test
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@zorder_tsdl_test
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2c038d17b83..5d3c2dccf5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -640,7 +640,7 @@ protected void initializeOp(Configuration hconf) throws
HiveException {
jc = new JobConf(hconf);
setWriteOperation(jc, getConf().getTableInfo().getTableName(),
getConf().getWriteOperation());
setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(),
- dpCtx != null && dpCtx.hasCustomSortExpression());
+ dpCtx != null && dpCtx.hasCustomPartitionOrSortExpression());
setMergeTaskEnabled(jc, getConf().getTableInfo().getTableName(),
Boolean.parseBoolean((String)
getConf().getTableInfo().getProperties().get(
MERGE_TASK_ENABLED + getConf().getTableInfo().getTableName())));
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 60be9c9466d..a057f4137e3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -31,6 +31,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.Constants;
@@ -202,13 +203,15 @@ public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx,
}
List<Integer> partitionPositions = getPartitionPositions(dpCtx,
fsParent.getSchema());
+ LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>>
customPartitionExprs =
+ new LinkedList<>(dpCtx.getCustomPartitionExpressions());
LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
new LinkedList<>(dpCtx.getCustomSortExpressions());
LinkedList<Integer> customSortOrder = new
LinkedList<>(dpCtx.getCustomSortOrder());
LinkedList<Integer> customNullOrder = new
LinkedList<>(dpCtx.getCustomSortNullOrder());
- // If custom sort expressions are present, there is an explicit
requirement to do sorting
- if (customSortExprs.isEmpty() && !shouldDo(partitionPositions,
fsParent)) {
+ // If custom expressions (partition or sort) are present, there is an
explicit requirement to do sorting
+ if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() &&
!shouldDo(partitionPositions, fsParent)) {
return null;
}
// if RS is inserted by enforce bucketing or sorting, we need to remove
it
@@ -308,8 +311,8 @@ public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx,
// Create ReduceSink operator
ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions,
sortPositions, sortOrder,
- sortNullOrder, customSortExprs, customSortOrder, customNullOrder,
allRSCols, bucketColumns, numBuckets,
- fsParent, fsOp.getConf().getWriteType());
+ sortNullOrder, customPartitionExprs, customSortExprs,
customSortOrder, customNullOrder,
+ allRSCols, bucketColumns, numBuckets, fsParent,
fsOp.getConf().getWriteType());
// we have to make sure not to reorder the child operators as it might
cause weird behavior in the tasks at
// the same level. when there is auto stats gather at the same level as
another operation then it might
// cause unnecessary preemption. Maintaining the order here to avoid
such preemption and possible errors
@@ -349,19 +352,18 @@ public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx,
customSortExprs.add(BUCKET_SORT_EXPRESSION);
}
- for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr :
customSortExprs) {
- ExprNodeDesc colExpr = customSortExpr.apply(allRSCols);
- String customSortColName = colExpr.getExprString();
- TypeInfo customSortColTypeInfo = colExpr.getTypeInfo();
-
- descs.add(new ExprNodeColumnDesc(customSortColTypeInfo,
ReduceField.KEY + "." + customSortColName,
- null, false));
- colNames.add(customSortColName);
- ColumnInfo ci = new ColumnInfo(
- customSortColName, customSortColTypeInfo,
selRS.getSignature().get(0).getTabAlias(), true, true);
- selRS.getSignature().add(ci);
- rsOp.getSchema().getSignature().add(ci);
- }
+ Stream.concat(customPartitionExprs.stream(), customSortExprs.stream())
+ .forEach(customExpr -> {
+ ExprNodeDesc colExpr = customExpr.apply(allRSCols);
+ String columnName = colExpr.getExprString();
+ TypeInfo colTypeInfo = colExpr.getTypeInfo();
+ descs.add(new ExprNodeColumnDesc(colTypeInfo, ReduceField.KEY +
"." + columnName, null, false));
+ colNames.add(columnName);
+ ColumnInfo ci = new ColumnInfo(
+ columnName, colTypeInfo,
selRS.getSignature().get(0).getTabAlias(), true, true);
+ selRS.getSignature().add(ci);
+ rsOp.getSchema().getSignature().add(ci);
+ });
// Create SelectDesc
SelectDesc selConf = new SelectDesc(descs, colNames);
@@ -410,29 +412,30 @@ private boolean allStaticPartitions(Operator<? extends
OperatorDesc> op, List<Ex
return false;
}
- List<String> referencedSortColumnNames = new LinkedList<>();
- List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
dynPartCtx.getCustomSortExpressions();
+ List<String> referencedPartitionColumnNames = new LinkedList<>();
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs =
+ dynPartCtx.getCustomPartitionExpressions();
- if (customSortExprs != null && !customSortExprs.isEmpty()) {
+ if (!customPartitionExprs.isEmpty()) {
Set<ExprNodeColumnDesc> columnDescs = new HashSet<>();
- // Find relevant column descs (e.g. _col0, _col2) for each sort
expression
- for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr :
customSortExprs) {
- ExprNodeDesc sortExpressionForRSSchema =
customSortExpr.apply(allRSCols);
-
columnDescs.addAll(ExprNodeDescUtils.findAllColumnDescs(sortExpressionForRSSchema));
+ // Find relevant column descs (e.g. _col0, _col2) for each partition
expression
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> customPartitionExpr :
customPartitionExprs) {
+ ExprNodeDesc partExpressionForRSSchema =
customPartitionExpr.apply(allRSCols);
+
columnDescs.addAll(ExprNodeDescUtils.findAllColumnDescs(partExpressionForRSSchema));
}
for (ExprNodeColumnDesc columnDesc : columnDescs) {
- referencedSortColumnNames.add(columnDesc.getColumn());
+ referencedPartitionColumnNames.add(columnDesc.getColumn());
}
} else {
int numDpCols = dynPartCtx.getNumDPCols();
int numCols = op.getSchema().getColumnNames().size();
-
referencedSortColumnNames.addAll(op.getSchema().getColumnNames().subList(numCols
- numDpCols, numCols));
+
referencedPartitionColumnNames.addAll(op.getSchema().getColumnNames().subList(numCols
- numDpCols, numCols));
}
- for(String dpCol : referencedSortColumnNames) {
+ for (String dpCol : referencedPartitionColumnNames) {
ExprNodeDesc end = ExprNodeDescUtils.findConstantExprOrigin(dpCol, op);
if (!(end instanceof ExprNodeConstantDesc)) {
// There is at least 1 column with no constant mapping -> we will
need to do the sorting
@@ -440,11 +443,17 @@ private boolean allStaticPartitions(Operator<? extends
OperatorDesc> op, List<Ex
}
}
+ // Custom sort expressions require SDPO for sorting
+ if (!dynPartCtx.getCustomSortExpressions().isEmpty()) {
+ return false;
+ }
+
+ LOG.debug("SDPO: all dynamic partition columns constant folded: {}",
referencedPartitionColumnNames);
// All columns had constant mappings
return true;
}
- // Remove RS and SEL introduced by enforce bucketing/sorting config
+ // Remove RS and SEL introduced by enforce bucketing/sorting conf
// Convert PARENT -> RS -> SEL -> FS to PARENT -> FS
private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
@@ -580,25 +589,28 @@ private void inferSortPositions(Operator<? extends
OperatorDesc> fsParent,
public ReduceSinkOperator getReduceSinkOp(List<Integer>
partitionPositions, List<Integer> sortPositions,
List<Integer> sortOrder, List<Integer> sortNullOrder,
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
List<Integer> customSortOrder, List<Integer> customSortNullOrder,
ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
int numBuckets, Operator<? extends OperatorDesc> parent,
AcidUtils.Operation writeType) {
- // Order of KEY columns, if custom sort is present partition and bucket
columns are disregarded:
- // 0) Custom sort expressions
- // 1) Partition columns
- // 2) Bucket number column
- // 3) Sort columns
+ // Order of KEY columns, if custom expressions are present:
+ // 0) Custom partition expressions (for distribution AND sorting)
+ // 1) Custom sort expressions (for sorting ONLY)
+ // 2) Partition columns
+ // 3) Bucket number column
+ // 4) Sort columns
+ boolean customPartitionExprPresent = customPartitionExprs != null &&
!customPartitionExprs.isEmpty();
boolean customSortExprPresent = customSortExprs != null &&
!customSortExprs.isEmpty();
+ boolean customExprPresent = customPartitionExprPresent ||
customSortExprPresent;
Set<Integer> keyColsPosInVal = Sets.newLinkedHashSet();
ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
List<Integer> newSortOrder = Lists.newArrayList();
- List<Integer> newSortNullOrder = Lists.newArrayList();
- if (customSortExprPresent) {
+ if (customPartitionExprPresent) {
partitionPositions = new ArrayList<>();
bucketColumns = new ArrayList<>();
numBuckets = -1;
@@ -618,15 +630,22 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer>
partitionPositions, List
}
}
- for (Integer ignored : keyColsPosInVal) {
- newSortOrder.add(order);
- }
-
- if (customSortExprPresent) {
- for (int i = 0; i < customSortExprs.size() - customSortOrder.size();
i++) {
+ if (customExprPresent) {
+ int numPartitionExprs = customPartitionExprs != null ?
customPartitionExprs.size() : 0;
+ for (int i = 0; i < numPartitionExprs; i++) {
newSortOrder.add(order);
}
- newSortOrder.addAll(customSortOrder);
+
+ int numSortExprs = customSortExprs != null ? customSortExprs.size() :
0;
+ for (int i = 0; i < numSortExprs; i++) {
+ newSortOrder.add(customSortOrder != null && i <
customSortOrder.size() ?
+ customSortOrder.get(i) :
+ order);
+ }
+ }
+
+ for (Integer ignored : keyColsPosInVal) {
+ newSortOrder.add(order);
}
String orderStr = "";
@@ -643,7 +662,10 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer>
partitionPositions, List
nullOrder = NullOrdering.fromCode(sortNullOrder.get(0)).getSign();
}
- StringBuilder nullOrderStr = new
StringBuilder(StringUtils.repeat(nullOrder, keyColsPosInVal.size()));
+ StringBuilder nullOrderStr = new StringBuilder();
+ if (customPartitionExprPresent) {
+ nullOrderStr.append(StringUtils.repeat(nullOrder,
customPartitionExprs.size()));
+ }
if (customSortExprPresent) {
for (int i = 0; i < customSortExprs.size() -
customSortNullOrder.size(); i++) {
nullOrderStr.append(nullOrder);
@@ -652,17 +674,28 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer>
partitionPositions, List
nullOrderStr.append(NullOrdering.fromCode(customSortNullOrder.get(i)).getSign());
}
}
+ nullOrderStr.append(StringUtils.repeat(nullOrder,
keyColsPosInVal.size()));
Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
- for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr :
customSortExprs) {
- ExprNodeDesc colExpr = customSortExpr.apply(allCols);
- // Custom sort expressions are marked as KEYs, which is required for
sorting the rows that are going for
- // a particular reducer instance. They also need to be marked as
'partition' columns for MapReduce shuffle
- // phase, in order to gather the same keys to the same reducer
instances.
- keyCols.add(colExpr);
- partCols.add(colExpr);
+ // Process custom partition expressions (e.g., Iceberg partition
transforms).
+ // These are used for BOTH distribution and sorting.
+ if (customPartitionExprs != null) {
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> partExpr :
customPartitionExprs) {
+ ExprNodeDesc colExpr = partExpr.apply(allCols);
+ keyCols.add(colExpr); // Add to sort keys
+ partCols.add(colExpr); // Add to distribution keys
+ }
+ }
+
+ // Process custom sort expressions (e.g., Z-order).
+ // These are used ONLY for sorting, NOT for distribution.
+ if (customSortExprs != null) {
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> sortExpr :
customSortExprs) {
+ ExprNodeDesc colExpr = sortExpr.apply(allCols);
+ keyCols.add(colExpr); // Add to sort keys only
+ }
}
// we will clone here as RS will update bucket column key with its
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 1eda24ec3b6..61c519aa62f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,6 +19,7 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -53,7 +54,8 @@ public class DynamicPartitionCtx implements Serializable {
private String defaultPartName; // default partition name in case of null or
empty value
private int maxPartsPerNode; // maximum dynamic partitions created per
mapper/reducer
private Pattern whiteListPattern;
- private boolean hasCustomSortExpr = false;
+ private boolean hasCustomPartitionOrSortExpr = false;
+ private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>>
customPartitionExpressions;
/**
* Expressions describing a custom way of sorting the table before write.
Expressions can reference simple
* column descriptions or a tree of expressions containing more columns and
UDFs.
@@ -131,6 +133,7 @@ public DynamicPartitionCtx(Map<String, String> partSpec,
String defaultPartName,
throw new SemanticException(e);
}
this.whiteListPattern = confVal == null || confVal.isEmpty() ? null :
Pattern.compile(confVal);
+ this.customPartitionExpressions = new LinkedList<>();
this.customSortExpressions = new LinkedList<>();
this.customSortOrder = new LinkedList<>();
this.customSortNullOrder = new LinkedList<>();
@@ -148,6 +151,8 @@ public DynamicPartitionCtx(DynamicPartitionCtx dp) {
this.defaultPartName = dp.defaultPartName;
this.maxPartsPerNode = dp.maxPartsPerNode;
this.whiteListPattern = dp.whiteListPattern;
+ this.customPartitionExpressions = new LinkedList<>();
+ addCustomPartitionExpressions(dp.customPartitionExpressions);
this.customSortExpressions = new LinkedList<>();
addCustomSortExpressions(dp.customSortExpressions);
this.customSortOrder = dp.customSortOrder;
@@ -238,13 +243,30 @@ public String getSPPath() {
return this.spPath;
}
+ public List<Function<List<ExprNodeDesc>, ExprNodeDesc>>
getCustomPartitionExpressions() {
+ return customPartitionExpressions == null
+ ? Collections.emptyList()
+ : customPartitionExpressions;
+ }
+
+ public void addCustomPartitionExpressions(
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>>
customPartitionExpressions) {
+ if
(!org.apache.commons.collections.CollectionUtils.isEmpty(customPartitionExpressions))
{
+ this.hasCustomPartitionOrSortExpr = true;
+ this.customPartitionExpressions.addAll(customPartitionExpressions);
+ }
+ }
+
public List<Function<List<ExprNodeDesc>, ExprNodeDesc>>
getCustomSortExpressions() {
- return customSortExpressions;
+ return customSortExpressions == null
+ ? Collections.emptyList()
+ : customSortExpressions;
}
- public void addCustomSortExpressions(List<Function<List<ExprNodeDesc>,
ExprNodeDesc>> customSortExpressions) {
+ public void addCustomSortExpressions(
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>>
customSortExpressions) {
if (!CollectionUtils.isEmpty(customSortExpressions)) {
- this.hasCustomSortExpr = true;
+ this.hasCustomPartitionOrSortExpr = true;
this.customSortExpressions.addAll(customSortExpressions);
}
}
@@ -265,7 +287,7 @@ public void setCustomSortNullOrder(List<Integer>
customSortNullOrder) {
this.customSortNullOrder = customSortNullOrder;
}
- public boolean hasCustomSortExpression() {
- return hasCustomSortExpr;
+ public boolean hasCustomPartitionOrSortExpression() {
+ return hasCustomPartitionOrSortExpr;
}
}