kgyrtkirk commented on a change in pull request #1031:
URL: https://github.com/apache/hive/pull/1031#discussion_r434513609



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -68,25 +82,32 @@
  *       ⇒ SELECT ds_kll_quantile(ds_kll_sketch(CAST(id AS FLOAT)), 0.2) FROM 
sketch_input;
  *    </pre>
  *  </li>
+ *  <li>{@code cume_dist() over (order by id)}

Review comment:
       I think these apidoc could be moved to the rewrite-rules - but they also 
have there meaning here as well...maybe move them and add a more brief 
description here?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -2495,19 +2495,22 @@ private static void 
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
     
HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED("hive.optimize.bi.rewrite.countdistinct.enabled",
         true,
         "Enables to rewrite COUNT(DISTINCT(X)) queries to be rewritten to use 
sketch functions."),
-    HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH(
-        "hive.optimize.bi.rewrite.countdistinct.sketch", "hll",
+    
HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH("hive.optimize.bi.rewrite.countdistinct.sketch",
 "hll",
         new StringSet("hll"),

Review comment:
       about enabling other sketches for count-distinct: I think they should 
just work - however they might need a little testing; probably more important 
would be to provide some way to change sketch construction 
parameters...actually for our rewrites the sketch type could be considered as 
part of the parameters

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelBuilder.java
##########
@@ -165,4 +166,10 @@ protected boolean shouldMergeProject() {
     return false;
   }
 
+  /** Make the method visible */
+  @Override
+  public AggCall aggregateCall(SqlAggFunction aggFunction, boolean distinct, 
boolean approximate, boolean ignoreNulls,

Review comment:
       this method is needed to use the relbuilder to create aggregates;
   the overriden method is protected...and there is no way to access this level 
of detail without exposing it
   

##########
File path: ql/src/test/queries/clientpositive/sketches_rewrite_cume_dist.q
##########
@@ -0,0 +1,47 @@
+--! qt:transactional
+
+
+create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true');
+
+insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 
'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 
'b'), (13, 'b'), (14, 'b'), (15, 'b')
+; 
+
+select id,cume_dist() over (order by id) from sketch_input;
+
+select id,cume_dist() over (order by id),1.0-ds_kll_cdf(ds, CAST(-id AS FLOAT) 
)[0]

Review comment:
       these commands nicely show the original expression and the rewritten one 
alongside to eachother - it would be nice to also add an assertion that they 
are in the same neightbourhood

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +388,210 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()), HiveRelFactories.HIVE_BUILDER, 
null);
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject == project) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected final RelNode processProject(Project project) {
+        RelNode origInput = project.getInput();
+        relBuilder.push(origInput);
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+          newProjects.add(expr.accept(shuttle));
+        }
+        if (relBuilder.peek() == origInput) {
+          relBuilder.clear();
+          return project;
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      /**
+       * Do the rewrite for the given expression.
+       *
+       * When this method is invoked the {@link #relBuilder} will only contain 
the current input.
+       * Expectation is to leave the new input there after the method finishes.
+       */
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+        RexWindow w = over.getWindow();
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());
+        // the CDF function utilizes the '<' operator;
+        // negating the input will mirror the values on the x axis
+        // by using 1-CDF(-x) we could get a <= operator
+        RexNode key = orderKey.getKey();
+        key = rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, key);
+        key = rexBuilder.makeCast(getFloatType(), key);
+
+        AggCall aggCall = ((HiveRelBuilder) relBuilder).aggregateCall(
+            (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH),
+            /* distinct */ false,
+            /* approximate */ false,
+            /* ignoreNulls */ true,
+            null,
+            ImmutableList.of(),
+            null,
+            ImmutableList.of(key));
+
+        relBuilder.aggregate(relBuilder.groupKey(partitionKeys), aggCall);
+
+        List<RexNode> joinConditions;
+        joinConditions = Ord.zip(partitionKeys).stream().map(o -> {
+          RexNode f = relBuilder.field(2, 1, o.i);
+          return rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_DISTINCT_FROM, 
o.e, f);

Review comment:
       interestingly: CUME_DIST returns NULLs when the partitioning coulmn is 
null - but the sketch based estimation (and postgres doesn't)
   opened HIVE-22939
   
   since this is not a simple '=' we could possibly loose some optimizations...

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
##########
@@ -1974,13 +1974,19 @@ private RelNode applyPreJoinOrderingTransforms(RelNode 
basePlan, RelMetadataProv
       if (!isMaterializedViewMaintenance() && 
conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_ENABLED)) {
         // Rewrite to datasketches if enabled
         if 
(conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED)) {
-          String countDistinctSketchType = 
conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH);
-          RelOptRule rule = new 
HiveRewriteToDataSketchesRules.CountDistinctRewrite(countDistinctSketchType);
+          String sketchType = 
conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH);
+          RelOptRule rule = new 
HiveRewriteToDataSketchesRules.CountDistinctRewrite(sketchType);
           generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, rule);
         }
         if 
(conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_ENABLED)) {
-          String percentileDiscSketchType = 
conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_SKETCH);
-          RelOptRule rule = new 
HiveRewriteToDataSketchesRules.PercentileDiscRewrite(percentileDiscSketchType);
+          String sketchType = 
conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_SKETCH);
+          RelOptRule rule = new 
HiveRewriteToDataSketchesRules.PercentileDiscRewrite(sketchType);
+          generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, rule);
+        }
+        if 
(conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_CUME_DIST_ENABLED)) {
+          String sketchType = 
conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_CUME_DIST_SKETCH);
+          //          RelBuilderFactory 
factory=HiveRelFactories.HIVE_BUILDER.create(basePlan.getCluster(), null);

Review comment:
       remove this line

##########
File path: 
ql/src/test/queries/clientpositive/sketches_materialized_view_cume_dist.q
##########
@@ -0,0 +1,54 @@
+--! qt:transactional
+set hive.fetch.task.conversion=none;
+
+create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true');
+
+insert into table sketch_input values

Review comment:
       this table should be put into a dataset or something in the next patch...

##########
File path: 
ql/src/test/queries/clientpositive/sketches_materialized_view_cume_dist.q
##########
@@ -0,0 +1,54 @@
+--! qt:transactional
+set hive.fetch.task.conversion=none;
+
+create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true');
+
+insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 
'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 
'b'), (13, 'b'), (14, 'b'), (15, 'b')
+; 
+
+-- create an mv for the intermediate results
+create  materialized view mv_1 as
+  select category,ds_kll_sketch(cast(-id as float)) from sketch_input group by 
category;
+
+-- bi mode on
+set hive.optimize.bi.enabled=true;
+
+explain
+select 'rewrite; mv matching', id, cume_dist() over (order by id) from 
sketch_input order by id;
+select 'rewrite; mv matching', id, cume_dist() over (order by id) from 
sketch_input order by id;

Review comment:
       it would be handy to introduce an instruction that the next sql 
statement "must" touch the table "mv_1"
   
   ```
   --! qt:plan:mustreadtable:mv_1
   ```
   I don't know implementing something like this is how much of a science 
fiction - it's certainly possible; however wiring that in might be tricky...

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
##########
@@ -368,4 +388,210 @@ void rewrite(AggregateCall aggCall) {
       }
     }
   }
+
+  /**
+   * Generic support for rewriting Windowing expression into a different form 
usually using joins.
+   */
+  private static abstract class WindowingToProjectAggregateJoinProject extends 
RelOptRule {
+
+    protected final String sketchType;
+
+    public WindowingToProjectAggregateJoinProject(String sketchType) {
+      super(operand(HiveProject.class, any()), HiveRelFactories.HIVE_BUILDER, 
null);
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Project project = call.rel(0);
+
+      VbuilderPAP vb = buildProcessor(call);
+      RelNode newProject = vb.processProject(project);
+
+      if (newProject == project) {
+        return;
+      } else {
+        call.transformTo(newProject);
+      }
+    }
+
+    protected abstract VbuilderPAP buildProcessor(RelOptRuleCall call);
+
+    protected static abstract class VbuilderPAP {
+      private final String sketchClass;
+      protected final RelBuilder relBuilder;
+      protected final RexBuilder rexBuilder;
+
+      protected VbuilderPAP(String sketchClass, RelBuilder relBuilder) {
+        this.sketchClass = sketchClass;
+        this.relBuilder = relBuilder;
+        rexBuilder = relBuilder.getRexBuilder();
+      }
+
+      final class ProcessShuttle extends RexShuttle {
+        public RexNode visitOver(RexOver over) {
+          return processCall(over);
+        }
+      };
+
+      protected final RelNode processProject(Project project) {
+        RelNode origInput = project.getInput();
+        relBuilder.push(origInput);
+        RexShuttle shuttle = new ProcessShuttle();
+        List<RexNode> newProjects = new ArrayList<RexNode>();
+        for (RexNode expr : project.getChildExps()) {
+          newProjects.add(expr.accept(shuttle));
+        }
+        if (relBuilder.peek() == origInput) {
+          relBuilder.clear();
+          return project;
+        }
+        relBuilder.project(newProjects);
+        return relBuilder.build();
+      }
+
+      private final RexNode processCall(RexNode expr) {
+        if (expr instanceof RexOver) {
+          RexOver over = (RexOver) expr;
+          if (isApplicable(over)) {
+            return rewrite(over);
+          }
+        }
+        return expr;
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      /**
+       * Do the rewrite for the given expression.
+       *
+       * When this method is invoked the {@link #relBuilder} will only contain 
the current input.
+       * Expectation is to leave the new input there after the method finishes.
+       */
+      abstract RexNode rewrite(RexOver expr);
+
+      abstract boolean isApplicable(RexOver expr);
+
+    }
+  }
+
+  public static class CumeDistRewrite extends 
WindowingToProjectAggregateJoinProject {
+
+    public CumeDistRewrite(String sketchType) {
+      super(sketchType);
+    }
+
+    @Override
+    protected VbuilderPAP buildProcessor(RelOptRuleCall call) {
+      return new VB(sketchType, call.builder());
+    }
+
+    private static class VB extends VbuilderPAP {
+
+      protected VB(String sketchClass, RelBuilder relBuilder) {
+        super(sketchClass, relBuilder);
+      }
+
+      @Override
+      boolean isApplicable(RexOver over) {
+        SqlAggFunction aggOp = over.getAggOperator();
+        RexWindow window = over.getWindow();
+        if (aggOp.getName().equalsIgnoreCase("cume_dist") && 
window.orderKeys.size() == 1
+            && window.getLowerBound().isUnbounded() && 
window.getUpperBound().isUnbounded()) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      RexNode rewrite(RexOver over) {
+        RexWindow w = over.getWindow();
+        RexFieldCollation orderKey = w.orderKeys.get(0);
+        // we don't really support nulls in aggregate/etc...they are actually 
ignored
+        // so some hack will be needed for NULLs anyway..
+        ImmutableList<RexNode> partitionKeys = w.partitionKeys;
+
+        relBuilder.push(relBuilder.peek());
+        // the CDF function utilizes the '<' operator;
+        // negating the input will mirror the values on the x axis
+        // by using 1-CDF(-x) we could get a <= operator
+        RexNode key = orderKey.getKey();
+        key = rexBuilder.makeCall(SqlStdOperatorTable.UNARY_MINUS, key);
+        key = rexBuilder.makeCast(getFloatType(), key);
+
+        AggCall aggCall = ((HiveRelBuilder) relBuilder).aggregateCall(
+            (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH),
+            /* distinct */ false,
+            /* approximate */ false,
+            /* ignoreNulls */ true,
+            null,
+            ImmutableList.of(),
+            null,
+            ImmutableList.of(key));
+
+        relBuilder.aggregate(relBuilder.groupKey(partitionKeys), aggCall);
+
+        List<RexNode> joinConditions;
+        joinConditions = Ord.zip(partitionKeys).stream().map(o -> {
+          RexNode f = relBuilder.field(2, 1, o.i);
+          return rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_DISTINCT_FROM, 
o.e, f);
+        }).collect(Collectors.toList());
+        relBuilder.join(JoinRelType.INNER, joinConditions);
+
+        int sketchFieldIndex = relBuilder.peek().getRowType().getFieldCount() 
- 1;
+        RexInputRef sketchInputRef = relBuilder.field(sketchFieldIndex);
+        SqlOperator projectOperator = 
getSqlOperator(DataSketchesFunctions.GET_CDF);
+
+        // NULLs will be replaced by this value - to be before / after the 
other values
+        // note: the sketch will ignore NULLs entirely but they will be placed 
at 0.0 or 1.0
+        final RexNode nullReplacement =
+            relBuilder.literal(orderKey.getNullDirection() == 
NullDirection.FIRST ? Float.MAX_VALUE : -Float.MAX_VALUE);
+
+        // long story short: CAST(1.0f-CDF(CAST(COALESCE(-X, nullReplacement) 
AS FLOAT))[0] AS targetType)

Review comment:
       I don't 100% like this approach - let's see if we could service 
everything this way...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to