okumin commented on code in PR #6389:
URL: https://github.com/apache/hive/pull/6389#discussion_r3039107571
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -880,34 +890,77 @@ private boolean shouldDo(List<Integer> partitionPos,
Operator<? extends Operator
if (colStats == null || colStats.isEmpty()) {
return true;
}
- long partCardinality = 1;
-
- // compute cardinality for partition columns
- for (Integer idx : partitionPos) {
- ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
- ColStatistics partStats =
fsParent.getStatistics().getColumnStatisticsFromColName(ci.getInternalName());
- if (partStats == null) {
- // statistics for this partition are for some reason not available
- return true;
- }
- partCardinality = partCardinality * partStats.getCountDistint();
+ long partCardinality = computePartCardinality(
+ partitionPos, customPartitionExprs, tStats, fsParent, allRSCols);
+ if (partCardinality == 0) {
+ // no partition columns at all
+ return false;
+ }
+ if (partCardinality < 0) {
+ // stats unavailable, be conservative -> sort
+ return true;
}
if (MAX_WRITERS < 0) {
- double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
- (Double) OrcConf.MEMORY_POOL.getDefaultValue());
- long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
- (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
- MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
- LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
- long executorMem = memoryInfo.getMaxExecutorMemory();
- MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize;
+ MAX_WRITERS = computeMaxWriters();
+ }
+ return partCardinality > MAX_WRITERS;
+ }
+
+ private long computeMaxWriters() {
+ double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
+ (Double) OrcConf.MEMORY_POOL.getDefaultValue());
+ long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
+ (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
+ MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
+ LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
+ long executorMem = memoryInfo.getMaxExecutorMemory();
+ return (long) (executorMem * orcMemPool) / orcStripSize;
+ }
+ /**
+ * Computes the partition cardinality based on column NDV statistics.
+ * @return positive value = estimated cardinality, 0 = no partition
columns, -1 = stats unavailable
+ */
+ private long computePartCardinality(List<Integer> partitionPos,
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
+ Statistics tStats, Operator<? extends OperatorDesc> fsParent,
+ ArrayList<ExprNodeDesc> allRSCols) {
+
+ if (!partitionPos.isEmpty()) {
+ long cardinality = 1;
+ for (Integer idx : partitionPos) {
+ ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(ci.getInternalName());
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
+ }
+ return cardinality;
}
- if (partCardinality <= MAX_WRITERS) {
- return false;
+
+ if (!customPartitionExprs.isEmpty()) {
+ // extract source column names from custom expressions (same approach
as allStaticPartitions)
+ Set<String> partColNames = new HashSet<>();
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> expr :
customPartitionExprs) {
+ ExprNodeDesc resolved = expr.apply(allRSCols);
+ for (ExprNodeColumnDesc colDesc :
ExprNodeDescUtils.findAllColumnDescs(resolved)) {
+ partColNames.add(colDesc.getColumn());
+ }
+ }
+ long cardinality = 1;
+ for (String colName : partColNames) {
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(colName);
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
Review Comment:
I might be wrong. I presume this evaluates `iceberg_bucket(ndv_100, 8 (=num
buckets))` as 100. As `iceberg_*` always narrows the source column, the current
implementation is likely to degrade performance for any workload.
This also means we have a chance to further optimize the cost-based
optimizer. Since the cardinality of `iceberg_bucket(x, 8)` is obviously
`min(cardinality(x), 8)`, we can enable the optimization for more cases.
I guess we can achieve the optimization if we implement the following API in
`iceberg_*` UDFs and resolve more accurate cardinality in this
`computePartCardinality` method.
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimator.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]