[ 
https://issues.apache.org/jira/browse/HIVE-20683?focusedWorklogId=308708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308708
 ]

ASF GitHub Bot logged work on HIVE-20683:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Sep/19 09:40
            Start Date: 09/Sep/19 09:40
    Worklog Time Spent: 10m 
      Work Description: b-slim commented on pull request #723: [HIVE-20683] Add 
the Ability to push Dynamic Between and Bloom filters to Druid
URL: https://github.com/apache/hive/pull/723#discussion_r322150545
 
 

 ##########
 File path: 
druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 ##########
 @@ -894,4 +945,255 @@ public static IndexSpec getIndexSpec(Configuration jc) {
     ImmutableList<AggregatorFactory> aggregatorFactories = 
aggregatorFactoryBuilder.build();
     return Pair.of(dimensions, aggregatorFactories.toArray(new 
AggregatorFactory[0]));
   }
+
+  // Druid only supports String,Long,Float,Double selectors
+  private static Set<TypeInfo> druidSupportedTypeInfos = 
ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo, TypeInfoFactory.charTypeInfo,
+      TypeInfoFactory.varcharTypeInfo, TypeInfoFactory.byteTypeInfo,
+      TypeInfoFactory.intTypeInfo, TypeInfoFactory.longTypeInfo,
+      TypeInfoFactory.shortTypeInfo, TypeInfoFactory.doubleTypeInfo
+  );
+
+  private static Set<TypeInfo> stringTypeInfos = ImmutableSet.<TypeInfo>of(
+      TypeInfoFactory.stringTypeInfo,
+      TypeInfoFactory.charTypeInfo, TypeInfoFactory.varcharTypeInfo
+  );
+
+
+  public static org.apache.druid.query.Query 
addDynamicFilters(org.apache.druid.query.Query query,
+      ExprNodeGenericFuncDesc filterExpr, Configuration conf, boolean 
resolveDynamicValues
+  ) {
+    List<VirtualColumn> virtualColumns = Arrays
+        .asList(getVirtualColumns(query).getVirtualColumns());
+    org.apache.druid.query.Query rv = query;
+    DimFilter joinReductionFilter = toDruidFilter(filterExpr, conf, 
virtualColumns,
+        resolveDynamicValues
+    );
+    if(joinReductionFilter != null) {
+      String type = query.getType();
+      DimFilter filter = new AndDimFilter(joinReductionFilter, 
query.getFilter());
+      switch (type) {
+      case org.apache.druid.query.Query.TIMESERIES:
+        rv = Druids.TimeseriesQueryBuilder.copy((TimeseriesQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.TOPN:
+        rv = new TopNQueryBuilder((TopNQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.GROUP_BY:
+        rv = new GroupByQuery.Builder((GroupByQuery) query)
+            .setDimFilter(filter)
+            .setVirtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SCAN:
+        rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      case org.apache.druid.query.Query.SELECT:
+        rv = Druids.SelectQueryBuilder.copy((SelectQuery) query)
+            .filters(filter)
+            .virtualColumns(VirtualColumns.create(virtualColumns))
+            .build();
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported Query type " + 
type);
+      }
+    }
+    return rv;
+  }
+
+  @Nullable
+  private static DimFilter toDruidFilter(ExprNodeDesc filterExpr, 
Configuration configuration,
+      List<VirtualColumn> virtualColumns, boolean resolveDynamicValues
+  ) {
+    if(filterExpr == null) {
+      return null;
+    }
+    Class<? extends GenericUDF> genericUDFClass = 
getGenericUDFClassFromExprDesc(filterExpr);
+    if(FunctionRegistry.isOpAnd(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, 
virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null && !delegates.isEmpty()) {
+        return new AndDimFilter(delegates);
+      }
+    }
+    if(FunctionRegistry.isOpOr(filterExpr)) {
+      Iterator<ExprNodeDesc> iterator = filterExpr.getChildren().iterator();
+      List<DimFilter> delegates = Lists.newArrayList();
+      while (iterator.hasNext()) {
+        DimFilter filter = toDruidFilter(iterator.next(), configuration, 
virtualColumns,
+            resolveDynamicValues
+        );
+        if(filter != null) {
+          delegates.add(filter);
+        }
+      }
+      if(delegates != null) {
+        return new OrDimFilter(delegates);
+      }
+    } else if(GenericUDFBetween.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(1), virtualColumns);
+      if(col != null) {
+        try {
+          StringComparator comparator = 
stringTypeInfos.contains(child.get(1).getTypeInfo())
+              ? StringComparators.LEXICOGRAPHIC
+              : StringComparators.NUMERIC;
+          String lower = evaluate(child.get(2), configuration, 
resolveDynamicValues);
+          String upper = evaluate(child.get(3), configuration, 
resolveDynamicValues);
+          return new BoundDimFilter(col, lower, upper, false, false, null, 
null,
+              comparator
+          );
+
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    } else if(GenericUDFInBloomFilter.class == genericUDFClass) {
+      List<ExprNodeDesc> child = filterExpr.getChildren();
+      String col = extractColName(child.get(0), virtualColumns);
+      if(col != null) {
+        try {
+          BloomKFilter bloomFilter = evaluateBloomFilter(child.get(1), 
configuration,
+              resolveDynamicValues
+          );
+          return new BloomDimFilter(col, 
BloomKFilterHolder.fromBloomKFilter(bloomFilter), null);
+        } catch (HiveException e) {
+          throw new RuntimeException(e);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return null;
+  }
+
+  private static String evaluate(ExprNodeDesc desc, Configuration 
configuration,
+      boolean resolveDynamicValue
+  ) throws HiveException {
+    ExprNodeEvaluator exprNodeEvaluator = ExprNodeEvaluatorFactory.get(desc, 
configuration);
+    if(exprNodeEvaluator instanceof ExprNodeDynamicValueEvaluator && 
!resolveDynamicValue) {
+      return desc.getExprStringForExplain();
+    } else {
+      return exprNodeEvaluator.evaluate(null).toString();
+    }
+  }
+
+  private static BloomKFilter evaluateBloomFilter(ExprNodeDesc desc, 
Configuration configuration,
+      boolean resolveDynamicValue
+  )
+      throws HiveException, IOException {
+    if(!resolveDynamicValue) {
+      // return a dummy bloom filter for explain
+      return new BloomKFilter(1);
+    } else {
+      BytesWritable bw = (BytesWritable) ExprNodeEvaluatorFactory.get(desc, 
configuration)
+              .evaluate(null);
+      return BloomKFilter.deserialize(ByteBuffer.wrap(bw.getBytes()));
+    }
+  }
+
+  public static String extractColName(ExprNodeDesc expr, List<VirtualColumn> 
virtualColumns) {
+    if(!druidSupportedTypeInfos.contains(expr.getTypeInfo())) {
+      // This column type is currently not supported in druid.(e.g boolean)
+      // We cannot pass the bloom filter to druid since bloom filter tests for 
exact object bytes.
+      return null;
+    }
+    if(expr instanceof ExprNodeColumnDesc) {
+      return ((ExprNodeColumnDesc) expr).getColumn();
+    }
+
+    ExprNodeGenericFuncDesc funcDesc = null;
+    if(expr instanceof ExprNodeGenericFuncDesc) {
+      funcDesc = (ExprNodeGenericFuncDesc) expr;
+    }
+    if(null == funcDesc) {
+      return null;
+    }
+    GenericUDF udf = funcDesc.getGenericUDF();
+    // bail out if its not a simple cast expression.
+    if(funcDesc.getChildren().size() == 1 && funcDesc.getChildren()
+        .get(0) instanceof ExprNodeColumnDesc) {
+      return null;
+    }
+    String columnName = ((ExprNodeColumnDesc) (funcDesc.getChildren()
+        .get(0))).getColumn();
+    ValueType targetType = null;
+    if(udf instanceof GenericUDFBridge) {
+      Class<? extends UDF> udfClass = ((GenericUDFBridge) udf).getUdfClass();
+      if(udfClass.equals(UDFToDouble.class)) {
+        targetType = ValueType.DOUBLE;
+      } else if(udfClass.equals(UDFToFloat.class)) {
+        targetType = ValueType.FLOAT;
+      } else if(udfClass.equals(UDFToLong.class)) {
+        targetType = ValueType.LONG;
+      } else if(udfClass.equals(GenericUDFToString.class)) {
 
 Review comment:
   this seems wrong to me the 2 types are un-convertible by definition `Class<? 
extends UDF> udfClass` != `GenericUDF`
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 308708)
    Time Spent: 3.5h  (was: 3h 20m)

> Add the Ability to push Dynamic Between and Bloom filters to Druid
> ------------------------------------------------------------------
>
>                 Key: HIVE-20683
>                 URL: https://issues.apache.org/jira/browse/HIVE-20683
>             Project: Hive
>          Issue Type: New Feature
>          Components: Druid integration
>            Reporter: Nishant Bangarwa
>            Assignee: Nishant Bangarwa
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-20683.1.patch, HIVE-20683.2.patch, 
> HIVE-20683.3.patch, HIVE-20683.4.patch, HIVE-20683.5.patch, 
> HIVE-20683.6.patch, HIVE-20683.8.patch, HIVE-20683.patch
>
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> For optimizing joins, Hive generates BETWEEN filter with min-max and BLOOM 
> filter for filtering one side of semi-join.
> Druid 0.13.0 will have support for Bloom filters (Added via 
> https://github.com/apache/incubator-druid/pull/6222)
> Implementation details - 
> # Hive generates and passes the filters as part of 'filterExpr' in TableScan. 
> # DruidQueryBasedRecordReader gets this filter passed as part of the conf. 
> # During execution phase, before sending the query to druid in 
> DruidQueryBasedRecordReader we will deserialize this filter, translate it 
> into a DruidDimFilter and add it to existing DruidQuery.  Tez executor 
> already ensures that when we start reading results from the record reader, 
> all the dynamic values are initialized. 
> # Explaining a druid query also prints the query sent to druid as 
> {{druid.json.query}}. We also need to make sure to update the druid query 
> with the filters. During explain we do not have the actual values for the 
> dynamic values, so instead of values we will print the dynamic expression 
> itself as part of druid query. 
> Note:- This work needs druid to be updated to version 0.13.0



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to