[ 
https://issues.apache.org/jira/browse/HIVE-23434?focusedWorklogId=434957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-434957
 ]

ASF GitHub Bot logged work on HIVE-23434:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/20 13:58
            Start Date: 19/May/20 13:58
    Worklog Time Spent: 10m 
      Work Description: kgyrtkirk commented on a change in pull request #1017:
URL: https://github.com/apache/hive/pull/1017#discussion_r427322559



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRule.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableBitSet.Builder;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * This rule could rewrite aggregate calls to be calculated using sketch based 
functions.
+ *
+ * <br/>
+ * Currently it can rewrite:
+ * <ul>
+ *  <li>{@code count(distinct(x))} to distinct counting sketches</li>
+ *  <li>{@code percentile_cont(0.2) within group (order by id)}</li>
+ *  </ul>
+ *
+ * <p>
+ *   The transformation here works on Aggregate nodes; the operations done are 
the following:
+ * </p>
+ * <ol>
+ * <li>Identify candidate aggregate calls</li>
+ * <li>A new Project is inserted below the Aggregate; to help with data 
pre-processing</li>
+ * <li>A new Aggregate is created in which the aggregation is done by the 
sketch function</li>
+ * <li>A new Project is inserted on top of the Aggregate; which unwraps the 
resulting
+ *    count-distinct estimation from the sketch representation</li>
+ * </ol>
+ */
+public final class HiveRewriteToDataSketchesRule extends RelOptRule {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HiveRewriteToDataSketchesRule.class);
+  private final Optional<String> countDistinctSketchType;
+  private final Optional<String> percentileContSketchType;
+  private final ProjectFactory projectFactory;
+
+  public HiveRewriteToDataSketchesRule(Optional<String> 
countDistinctSketchType,
+      Optional<String> percentileContSketchType) {
+    super(operand(HiveAggregate.class, any()));
+    this.countDistinctSketchType = countDistinctSketchType;
+    this.percentileContSketchType = percentileContSketchType;
+    projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+
+    if (aggregate.getGroupSets().size() != 1) {
+      // not yet supported
+      return;
+    }
+
+    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
+
+    VBuilder vb = new VBuilder(aggregate);
+
+    if (aggregate.getAggCallList().equals(vb.newAggCalls)) {
+      // rule didn't made any changes
+      return;
+    }
+
+    newAggCalls = vb.newAggCalls;
+    List<String> filedNames=new ArrayList<String>();
+    for (int i=0;i<vb.newProjectsBelow.size();i++ ) {
+      filedNames.add("ff_"+i);
+    }
+    RelNode newProjectBelow=
+        projectFactory.createProject(aggregate.getInput(), 
vb.newProjectsBelow, filedNames);
+
+    RelNode newAgg = aggregate.copy(aggregate.getTraitSet(), newProjectBelow, 
aggregate.getGroupSet(),
+        aggregate.getGroupSets(), newAggCalls);
+
+    RelNode newProject = projectFactory.createProject(newAgg, vb.newProjects, 
aggregate.getRowType().getFieldNames());
+
+    call.transformTo(newProject);
+    return;
+  }
+
+  /**
+   * Helper class to help in building a new Aggregate and Project.
+   */
+  // NOTE: methods in this class are not re-entrant; drop-to-frame to 
constructor during debugging
+  private class VBuilder {
+
+    private final RexBuilder rexBuilder;
+
+    private Aggregate aggregate;
+    private List<AggregateCall> newAggCalls;
+    private List<RexNode> newProjects;
+    private List<RexNode> newProjectsBelow;
+    private List<RewriteProcedure> rewrites;
+
+    public VBuilder(Aggregate aggregate) {
+      this.aggregate = aggregate;
+      newAggCalls = new ArrayList<AggregateCall>();
+      newProjects = new ArrayList<RexNode>();
+      newProjectsBelow = new ArrayList<RexNode>();
+      rexBuilder = aggregate.getCluster().getRexBuilder();
+      rewrites = new ArrayList<RewriteProcedure>();
+
+      // add identity projections
+      addProjectedFields();
+
+      if (countDistinctSketchType.isPresent()) {
+        rewrites.add(new CountDistinctRewrite(countDistinctSketchType.get()));
+      }
+      if (percentileContSketchType.isPresent()) {
+        rewrites.add(new 
PercentileContRewrite(percentileContSketchType.get()));
+      }
+
+      for (AggregateCall aggCall : aggregate.getAggCallList()) {
+        processAggCall(aggCall);
+      }
+    }
+
+    private void addProjectedFields() {
+      for (int i = 0; i < aggregate.getGroupCount(); i++) {
+        newProjects.add(rexBuilder.makeInputRef(aggregate.getInput(), i));
+      }
+      Builder b = ImmutableBitSet.builder();
+      b.addAll(aggregate.getGroupSet());
+      for (AggregateCall aggCall: aggregate.getAggCallList()) {
+        b.addAll(aggCall.getArgList());
+      }
+      ImmutableBitSet inputs = b.build();
+      Integer maxIdx = Collections.max(inputs.asSet());
+      for (int i = 0; i < maxIdx; i++) {
+        newProjectsBelow.add(rexBuilder.makeInputRef(aggregate.getInput(), i));
+      }
+    }
+
+    private void processAggCall(AggregateCall aggCall) {
+      for (RewriteProcedure rewrite : rewrites) {
+        if (rewrite.isApplicable(aggCall)) {
+          rewrite.rewrite(aggCall);
+          return;
+        }
+      }
+      appendAggCall(aggCall);
+    }
+
+    private void appendAggCall(AggregateCall aggCall) {
+      RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), 
newProjects.size());
+
+      newAggCalls.add(aggCall);
+      newProjects.add(projRex);
+    }
+
+    abstract class RewriteProcedure {
+
+      private final String sketchClass;
+
+      public RewriteProcedure(String sketchClass) {
+        this.sketchClass = sketchClass;
+      }
+
+      abstract boolean isApplicable(AggregateCall aggCall);
+      abstract void rewrite(AggregateCall aggCall);
+
+      protected SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = 
DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite 
function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+    }
+
+    class CountDistinctRewrite extends RewriteProcedure {
+
+      public CountDistinctRewrite(String sketchClass) {
+        super(sketchClass);
+      }
+
+      @Override
+      boolean isApplicable(AggregateCall aggCall) {
+        return aggCall.isDistinct() && aggCall.getArgList().size() == 1
+        && aggCall.getAggregation().getName().equalsIgnoreCase("count") && 
!aggCall.hasFilter();
+      }
+
+      @Override
+      void rewrite(AggregateCall aggCall) {
+        RelDataType origType = 
aggregate.getRowType().getFieldList().get(newProjects.size()).getType();
+
+        Integer argIndex = aggCall.getArgList().get(0);
+        RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex);
+        newProjectsBelow.add(call);
+
+        ArrayList<Integer> newArgList = 
Lists.newArrayList(newProjectsBelow.size() - 1);
+
+        SqlAggFunction aggFunction = (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = newArgList;
+        int filterArg = aggCall.filterArg;
+        RelCollation collation = aggCall.getCollation();
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, 
Collections.emptyList());
+        String name = aggFunction.getName();
+
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, 
approximate, ignoreNulls, argList, filterArg,
+            collation, type, name);
+
+        SqlOperator projectOperator = 
getSqlOperator(DataSketchesFunctions.SKETCH_TO_ESTIMATE);
+        RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), 
newProjects.size());
+        projRex = rexBuilder.makeCall(projectOperator, 
ImmutableList.of(projRex));
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.ROUND, 
ImmutableList.of(projRex));
+        projRex = rexBuilder.makeCast(origType, projRex);
+
+        newAggCalls.add(newAgg);
+        newProjects.add(projRex);
+      }
+
+    }
+
+    class PercentileContRewrite extends RewriteProcedure {
+
+      public PercentileContRewrite(String sketchClass) {
+        super(sketchClass);
+      }
+
+      @Override
+      boolean isApplicable(AggregateCall aggCall) {
+        // FIXME: also check that args are: ?,?,1,0 - other cases are not 
supported
+        return !aggCall.isDistinct() && aggCall.getArgList().size() == 4
+            && 
aggCall.getAggregation().getName().equalsIgnoreCase("percentile_cont") && 
!aggCall.hasFilter();
+      }
+
+      @Override
+      void rewrite(AggregateCall aggCall) {
+        RelDataType origType = 
aggregate.getRowType().getFieldList().get(newProjects.size()).getType();
+
+        Integer argIndex = aggCall.getArgList().get(1);
+        RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex);
+
+        RelDataType floatType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.FLOAT);
+        call = rexBuilder.makeCast(floatType, call);
+        newProjectsBelow.add(call);
+
+        ArrayList<Integer> newArgList = 
Lists.newArrayList(newProjectsBelow.size() - 1);
+
+        SqlAggFunction aggFunction = (SqlAggFunction) 
getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = newArgList;
+        int filterArg = aggCall.filterArg;
+        RelCollation collation = aggCall.getCollation();
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, 
Collections.emptyList());
+        String name = aggFunction.getName();
+
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, 
approximate, ignoreNulls, argList, filterArg,
+            collation, type, name);
+
+        Integer origFractionIdx = aggCall.getArgList().get(0);
+        RexNode fraction = 
getProject(aggregate.getInput()).getChildExps().get(origFractionIdx);
+        fraction = rexBuilder.makeCast(floatType, fraction);
+
+        SqlOperator projectOperator = 
getSqlOperator(DataSketchesFunctions.GET_QUANTILE);
+        RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), 
newProjects.size());
+        projRex = rexBuilder.makeCall(projectOperator, 
ImmutableList.of(projRex, fraction));
+        projRex = rexBuilder.makeCast(origType, projRex);
+
+        newAggCalls.add(newAgg);
+        newProjects.add(projRex);
+
+      }
+
+    }
+
+    private Project getProject(RelNode input) {
+      if (input instanceof Project) {
+        return (Project) input;
+      }
+      if (input instanceof HepRelVertex) {

Review comment:
       ok - I've update it - I'll separate the 3 rules in next patch differently




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 434957)
    Time Spent: 2h  (was: 1h 50m)

> Add option to rewrite PERCENTILE_CONT to sketch functions
> ---------------------------------------------------------
>
>                 Key: HIVE-23434
>                 URL: https://issues.apache.org/jira/browse/HIVE-23434
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Zoltan Haindrich
>            Assignee: Zoltan Haindrich
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23434.01.patch, HIVE-23434.02.patch, 
> HIVE-23434.03.patch
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to