[ 
https://issues.apache.org/jira/browse/DRILL-3735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745632#comment-14745632
 ] 

ASF GitHub Bot commented on DRILL-3735:
---------------------------------------

Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39527473
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, 
DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition 
pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new 
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), 
allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new 
ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : 
BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = 
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, 
partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new 
DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, container, errors, 
optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of 
BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == 
TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = 
ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", 
batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything 
is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new 
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), 
allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new 
ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : 
BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = 
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: 
{}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, 
partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new 
DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, container, errors, 
optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of 
BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == 
TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = 
ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, 
container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    +          logger.warn("Failure while materializing expression [{}].  
Errors: {}", expr, errors);
    +        }
     
    -      List<String> newFiles = Lists.newArrayList();
    -      for(PartitionLocation part: partitions){
    -        if(!output.getAccessor().isNull(record) && 
output.getAccessor().get(record) == 1){
    -          newFiles.add(part.getEntirePartitionLocation());
    +        output.allocateNew(partitions.size());
    +        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, 
container, output, materializedExpr);
    +        int recordCount = 0;
    +        int qualifiedCount = 0;
    +
    +        // Inner loop: within each batch iterate over the 
PartitionLocations
    +        for(PartitionLocation part: partitions){
    +          if(!output.getAccessor().isNull(recordCount) && 
output.getAccessor().get(recordCount) == 1){
    +            newFiles.add(part.getEntirePartitionLocation());
    +            qualifiedCount++;
    +          }
    +          recordCount++;
    +        }
    +        logger.debug("Within batch {}: total records: {}, qualified 
records: {}", batchIndex, recordCount, qualifiedCount);
    +        batchIndex++;
    +      } catch (Exception e) {
    +        logger.warn("Exception while trying to prune partition.", e);
    --- End diff --
    
    Yes, we should do an early return once an exception is caught..I will add 
that. 


> Directory pruning is not happening when number of files is larger than 64k
> --------------------------------------------------------------------------
>
>                 Key: DRILL-3735
>                 URL: https://issues.apache.org/jira/browse/DRILL-3735
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.1.0
>            Reporter: Hao Zhu
>            Assignee: Mehant Baid
>             Fix For: 1.2.0
>
>
> When the number of files is larger than 64k limit, directory pruning is not 
> happening. 
> We need to increase this limit further to handle most use cases.
> My proposal is to separate the code for directory pruning and partition 
> pruning. 
> Say in a parent directory there are 100 directories and 1 million files.
> If we only query the file from one directory, we should firstly read the 100 
> directories and narrow down to which directory; and then read the file paths 
> in that directory in memory and do the rest stuff.
> Current behavior is , Drill will read all the file paths of that 1 million 
> files in memory firstly, and then do directory pruning or partition pruning. 
> This is not performance efficient nor memory efficient. And also it can not 
> scale.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to