okumin commented on code in PR #6389:
URL: https://github.com/apache/hive/pull/6389#discussion_r3036375924
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -210,10 +210,17 @@ public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx,
LinkedList<Integer> customSortOrder = new
LinkedList<>(dpCtx.getCustomSortOrder());
LinkedList<Integer> customNullOrder = new
LinkedList<>(dpCtx.getCustomSortNullOrder());
- // If custom expressions (partition or sort) are present, there is an
explicit requirement to do sorting
- if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() &&
!shouldDo(partitionPositions, fsParent)) {
+ // If custom sort expressions are present, there is an explicit
requirement to do sorting.
+ // Custom partition expressions are evaluated inside shouldDo based on
column stats.
+ if (customSortExprs.isEmpty() && !shouldDo(partitionPositions,
customPartitionExprs, fsParent, allRSCols)) {
return null;
}
+
+ // Mark that sorting will be applied with custom partition expressions,
so the writer layer
+ // (e.g. Iceberg) knows the input is ordered and can use a clustered
writer.
+ if (!customPartitionExprs.isEmpty()) {
+ dpCtx.setHasCustomPartitionOrSortExpression(true);
+ }
Review Comment:
Should this be located after all the bailouts are evaluated? It means we may
mutate the context after `if (!removeRSInsertedByEnforceBucketing(fsOp)) {`
finished.
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -880,34 +890,77 @@ private boolean shouldDo(List<Integer> partitionPos,
Operator<? extends Operator
if (colStats == null || colStats.isEmpty()) {
return true;
}
- long partCardinality = 1;
-
- // compute cardinality for partition columns
- for (Integer idx : partitionPos) {
- ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
- ColStatistics partStats =
fsParent.getStatistics().getColumnStatisticsFromColName(ci.getInternalName());
- if (partStats == null) {
- // statistics for this partition are for some reason not available
- return true;
- }
- partCardinality = partCardinality * partStats.getCountDistint();
+ long partCardinality = computePartCardinality(
+ partitionPos, customPartitionExprs, tStats, fsParent, allRSCols);
+ if (partCardinality == 0) {
+ // no partition columns at all
+ return false;
+ }
+ if (partCardinality < 0) {
+ // stats unavailable, be conservative -> sort
+ return true;
}
if (MAX_WRITERS < 0) {
- double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
- (Double) OrcConf.MEMORY_POOL.getDefaultValue());
- long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
- (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
- MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
- LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
- long executorMem = memoryInfo.getMaxExecutorMemory();
- MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize;
+ MAX_WRITERS = computeMaxWriters();
+ }
+ return partCardinality > MAX_WRITERS;
+ }
+
+ private long computeMaxWriters() {
+ double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
+ (Double) OrcConf.MEMORY_POOL.getDefaultValue());
+ long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
+ (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
+ MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
+ LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
+ long executorMem = memoryInfo.getMaxExecutorMemory();
+ return (long) (executorMem * orcMemPool) / orcStripSize;
+ }
+ /**
+ * Computes the partition cardinality based on column NDV statistics.
+ * @return positive value = estimated cardinality, 0 = no partition
columns, -1 = stats unavailable
+ */
+ private long computePartCardinality(List<Integer> partitionPos,
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
+ Statistics tStats, Operator<? extends OperatorDesc> fsParent,
+ ArrayList<ExprNodeDesc> allRSCols) {
+
+ if (!partitionPos.isEmpty()) {
+ long cardinality = 1;
+ for (Integer idx : partitionPos) {
+ ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(ci.getInternalName());
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
+ }
+ return cardinality;
}
- if (partCardinality <= MAX_WRITERS) {
- return false;
+
+ if (!customPartitionExprs.isEmpty()) {
+ // extract source column names from custom expressions (same approach
as allStaticPartitions)
+ Set<String> partColNames = new HashSet<>();
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> expr :
customPartitionExprs) {
+ ExprNodeDesc resolved = expr.apply(allRSCols);
+ for (ExprNodeColumnDesc colDesc :
ExprNodeDescUtils.findAllColumnDescs(resolved)) {
+ partColNames.add(colDesc.getColumn());
+ }
+ }
+ long cardinality = 1;
+ for (String colName : partColNames) {
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(colName);
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
Review Comment:
I might be wrong. I presume this evaluates `iceberg_bucket(ndv_100, 8 (=num
buckets))` as 100. As `iceberg_*` always narrows the source column, the current
implementation is likely to degrade performance for any workload.
This also means we have a chance to further optimize the cost-based
optimizer. Since the cardinality of `iceberg_bucket(x, 8)` is obviously
`min(cardinality(x), 8)`, we can enable the optimization for more cases.
I guess we can achieve the optimization if we implement the following API in
`iceberg_*` UDFs.
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimator.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]