okumin commented on code in PR #6389:
URL: https://github.com/apache/hive/pull/6389#discussion_r3036375924


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -210,10 +210,17 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       LinkedList<Integer> customSortOrder = new 
LinkedList<>(dpCtx.getCustomSortOrder());
       LinkedList<Integer> customNullOrder = new 
LinkedList<>(dpCtx.getCustomSortNullOrder());
 
-      // If custom expressions (partition or sort) are present, there is an 
explicit requirement to do sorting
-      if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() && 
!shouldDo(partitionPositions, fsParent)) {
+      // If custom sort expressions are present, there is an explicit 
requirement to do sorting.
+      // Custom partition expressions are evaluated inside shouldDo based on 
column stats.
+      if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, 
customPartitionExprs, fsParent, allRSCols)) {
         return null;
       }
+
+      // Mark that sorting will be applied with custom partition expressions, 
so the writer layer
+      // (e.g. Iceberg) knows the input is ordered and can use a clustered 
writer.
+      if (!customPartitionExprs.isEmpty()) {
+        dpCtx.setHasCustomPartitionOrSortExpression(true);
+      }

Review Comment:
   Should this be located after all the bailouts are evaluated? It means we may 
mutate the context after `if (!removeRSInsertedByEnforceBucketing(fsOp)) {` 
finished.



##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -880,34 +890,77 @@ private boolean shouldDo(List<Integer> partitionPos, 
Operator<? extends Operator
       if (colStats == null || colStats.isEmpty()) {
         return true;
       }
-      long partCardinality = 1;
-
-      // compute cardinality for partition columns
-      for (Integer idx : partitionPos) {
-        ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
-        ColStatistics partStats = 
fsParent.getStatistics().getColumnStatisticsFromColName(ci.getInternalName());
-        if (partStats == null) {
-          // statistics for this partition are for some reason not available
-          return true;
-        }
-        partCardinality = partCardinality * partStats.getCountDistint();
+      long partCardinality = computePartCardinality(
+          partitionPos, customPartitionExprs, tStats, fsParent, allRSCols);
+      if (partCardinality == 0) {
+        // no partition columns at all
+        return false;
+      }
+      if (partCardinality < 0) {
+        // stats unavailable, be conservative -> sort
+        return true;
       }
 
       if (MAX_WRITERS < 0) {
-        double orcMemPool = 
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
-            (Double) OrcConf.MEMORY_POOL.getDefaultValue());
-        long orcStripSize = 
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
-            (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
-        MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
-        LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
-        long executorMem = memoryInfo.getMaxExecutorMemory();
-        MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize;
+        MAX_WRITERS = computeMaxWriters();
+      }
+      return partCardinality > MAX_WRITERS;
+    }
+
+    private long computeMaxWriters() {
+      double orcMemPool = 
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
+          (Double) OrcConf.MEMORY_POOL.getDefaultValue());
+      long orcStripSize = 
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
+          (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
+      MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
+      LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
+      long executorMem = memoryInfo.getMaxExecutorMemory();
+      return (long) (executorMem * orcMemPool) / orcStripSize;
+    }
 
+    /**
+     * Computes the partition cardinality based on column NDV statistics.
+     * @return positive value = estimated cardinality, 0 = no partition 
columns, -1 = stats unavailable
+     */
+    private long computePartCardinality(List<Integer> partitionPos,
+        List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
+        Statistics tStats, Operator<? extends OperatorDesc> fsParent,
+        ArrayList<ExprNodeDesc> allRSCols) {
+
+      if (!partitionPos.isEmpty()) {
+        long cardinality = 1;
+        for (Integer idx : partitionPos) {
+          ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
+          ColStatistics partStats = 
tStats.getColumnStatisticsFromColName(ci.getInternalName());
+          if (partStats == null) {
+            return -1;
+          }
+          cardinality *= partStats.getCountDistint();
+        }
+        return cardinality;
       }
-      if (partCardinality <= MAX_WRITERS) {
-        return false;
+
+      if (!customPartitionExprs.isEmpty()) {
+        // extract source column names from custom expressions (same approach 
as allStaticPartitions)
+        Set<String> partColNames = new HashSet<>();
+        for (Function<List<ExprNodeDesc>, ExprNodeDesc> expr : 
customPartitionExprs) {
+          ExprNodeDesc resolved = expr.apply(allRSCols);
+          for (ExprNodeColumnDesc colDesc : 
ExprNodeDescUtils.findAllColumnDescs(resolved)) {
+            partColNames.add(colDesc.getColumn());
+          }
+        }
+        long cardinality = 1;
+        for (String colName : partColNames) {
+          ColStatistics partStats = 
tStats.getColumnStatisticsFromColName(colName);
+          if (partStats == null) {
+            return -1;
+          }
+          cardinality *= partStats.getCountDistint();

Review Comment:
   I might be wrong. I presume this evaluates `iceberg_bucket(ndv_100, 8 (=num 
buckets))` as 100. As `iceberg_*` always narrows the source column, the current 
implementation is likely to degrade performance for any workload.
   This also means we have a chance to further optimize the cost-based 
optimizer. Since the cardinality of `iceberg_bucket(x, 8)` is obviously 
`min(cardinality(x), 8)`, we can enable the optimization for more cases.
   
   I guess we can achieve the optimization if we implement the following API in 
`iceberg_*` UDFs.
   
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimator.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to