[
https://issues.apache.org/jira/browse/DRILL-3735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744578#comment-14744578
]
ASF GitHub Bot commented on DRILL-3735:
---------------------------------------
Github user jinfengni commented on a diff in the pull request:
https://github.com/apache/drill/pull/156#discussion_r39464031
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
---
@@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call,
DrillFilterRel filterRel, DrillPro
RexNode pruneCondition = c.getFinalCondition();
if (pruneCondition == null) {
+ logger.debug("No conditions were found eligible for partition
pruning.");
return;
}
// set up the partitions
- final GroupScan groupScan = scanRel.getGroupScan();
- List<PartitionLocation> partitions = descriptor.getPartitions();
-
- if (partitions.size() > Character.MAX_VALUE) {
- return;
- }
-
- final NullableBitVector output = new
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)),
allocator);
- final VectorContainer container = new VectorContainer();
-
- try {
- final ValueVector[] vectors = new
ValueVector[descriptor.getMaxHierarchyLevel()];
- for (int partitionColumnIndex :
BitSets.toIter(partitionColumnBitSet)) {
- SchemaPath column =
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
- MajorType type = descriptor.getVectorType(column, settings);
- MaterializedField field = MaterializedField.create(column, type);
- ValueVector v = TypeHelper.getNewVector(field, allocator);
- v.allocateNew();
- vectors[partitionColumnIndex] = v;
- container.add(v);
- }
-
- // populate partition vectors.
- descriptor.populatePartitionVectors(vectors, partitions,
partitionColumnBitSet, fieldNameMap);
-
- // materialize the expression
- logger.debug("Attempting to prune {}", pruneCondition);
- final LogicalExpression expr = DrillOptiq.toDrill(new
DrillParseContext(settings), scanRel, pruneCondition);
- final ErrorCollectorImpl errors = new ErrorCollectorImpl();
-
- LogicalExpression materializedExpr =
ExpressionTreeMaterializer.materialize(expr, container, errors,
optimizerContext.getFunctionRegistry());
- // Make sure pruneCondition's materialized expression is always of
BitType, so that
- // it's same as the type of output vector.
- if (materializedExpr.getMajorType().getMode() ==
TypeProtos.DataMode.REQUIRED) {
- materializedExpr =
ExpressionTreeMaterializer.convertToNullableType(
- materializedExpr,
- materializedExpr.getMajorType().getMinorType(),
- optimizerContext.getFunctionRegistry(),
- errors);
+ List<String> newFiles = Lists.newArrayList();
+ long numTotal = 0; // total number of partitions
+ int batchIndex = 0;
+ String firstLocation = null;
+
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ for (List<PartitionLocation> partitions : descriptor) {
+ numTotal += partitions.size();
+ logger.debug("Evaluating partition pruning for batch {}",
batchIndex);
+ if (batchIndex == 0) { // save the first location in case everything
is pruned
+ firstLocation = partitions.get(0).getEntirePartitionLocation();
}
+ final NullableBitVector output = new
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)),
allocator);
+ final VectorContainer container = new VectorContainer();
+
+ try {
+ final ValueVector[] vectors = new
ValueVector[descriptor.getMaxHierarchyLevel()];
+ for (int partitionColumnIndex :
BitSets.toIter(partitionColumnBitSet)) {
+ SchemaPath column =
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+ MajorType type = descriptor.getVectorType(column, settings);
+ MaterializedField field = MaterializedField.create(column, type);
+ ValueVector v = TypeHelper.getNewVector(field, allocator);
+ v.allocateNew();
+ vectors[partitionColumnIndex] = v;
+ container.add(v);
+ }
- if (errors.getErrorCount() != 0) {
- logger.warn("Failure while materializing expression [{}]. Errors:
{}", expr, errors);
- }
+ // populate partition vectors.
+ descriptor.populatePartitionVectors(vectors, partitions,
partitionColumnBitSet, fieldNameMap);
+
+ // materialize the expression
+ logger.debug("Attempting to prune {}", pruneCondition);
+ final LogicalExpression expr = DrillOptiq.toDrill(new
DrillParseContext(settings), scanRel, pruneCondition);
+ final ErrorCollectorImpl errors = new ErrorCollectorImpl();
+
+ LogicalExpression materializedExpr =
ExpressionTreeMaterializer.materialize(expr, container, errors,
optimizerContext.getFunctionRegistry());
+ // Make sure pruneCondition's materialized expression is always of
BitType, so that
+ // it's same as the type of output vector.
+ if (materializedExpr.getMajorType().getMode() ==
TypeProtos.DataMode.REQUIRED) {
+ materializedExpr =
ExpressionTreeMaterializer.convertToNullableType(
+ materializedExpr,
+ materializedExpr.getMajorType().getMinorType(),
+ optimizerContext.getFunctionRegistry(),
+ errors);
+ }
- output.allocateNew(partitions.size());
- InterpreterEvaluator.evaluate(partitions.size(), optimizerContext,
container, output, materializedExpr);
- int record = 0;
+ if (errors.getErrorCount() != 0) {
+ logger.warn("Failure while materializing expression [{}].
Errors: {}", expr, errors);
+ }
- List<String> newFiles = Lists.newArrayList();
- for(PartitionLocation part: partitions){
- if(!output.getAccessor().isNull(record) &&
output.getAccessor().get(record) == 1){
- newFiles.add(part.getEntirePartitionLocation());
+ output.allocateNew(partitions.size());
+ InterpreterEvaluator.evaluate(partitions.size(), optimizerContext,
container, output, materializedExpr);
+ int recordCount = 0;
+ int qualifiedCount = 0;
+
+ // Inner loop: within each batch iterate over the
PartitionLocations
+ for(PartitionLocation part: partitions){
+ if(!output.getAccessor().isNull(recordCount) &&
output.getAccessor().get(recordCount) == 1){
+ newFiles.add(part.getEntirePartitionLocation());
+ qualifiedCount++;
+ }
+ recordCount++;
+ }
+ logger.debug("Within batch {}: total records: {}, qualified
records: {}", batchIndex, recordCount, qualifiedCount);
+ batchIndex++;
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune partition.", e);
--- End diff --
If there is Exception during partition pruning for one sublist, seems we
just log the error as a warning. That means, the code could continue the logic
after the "for-loop". I feel that might produce incorrect result, since the
list of new partitions might be invalid.
Should we stop the execution of partition pruning, once an Exception is
caught here?
> Directory pruning is not happening when number of files is larger than 64k
> --------------------------------------------------------------------------
>
> Key: DRILL-3735
> URL: https://issues.apache.org/jira/browse/DRILL-3735
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.1.0
> Reporter: Hao Zhu
> Assignee: Mehant Baid
> Fix For: 1.2.0
>
>
> When the number of files is larger than 64k limit, directory pruning is not
> happening.
> We need to increase this limit further to handle most use cases.
> My proposal is to separate the code for directory pruning and partition
> pruning.
> Say in a parent directory there are 100 directories and 1 million files.
> If we only query the file from one directory, we should firstly read the 100
> directories and narrow down to which directory; and then read the file paths
> in that directory in memory and do the rest stuff.
> Current behavior is , Drill will read all the file paths of that 1 million
> files in memory firstly, and then do directory pruning or partition pruning.
> This is not performance efficient nor memory efficient. And also it can not
> scale.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)