gparai commented on a change in pull request #1640: DRILL-7038: Queries on 
partitioned columns scan the entire datasets
URL: https://github.com/apache/drill/pull/1640#discussion_r258303798
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 ##########
 @@ -550,4 +567,180 @@ private static void setPruneStatus(MetadataContext 
metaContext, PruneStatus prun
     }
   }
 
+  private static class PruneFilesOnScanRule extends PruneScanRule {
+
+    private final Pattern dirPattern;
+
+    private PruneFilesOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+      super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, 
RelOptHelper.any(TableScan.class)),
+          "PruneFilesOnScanRule:Prune_On_Scan", optimizerRulesContext);
+      String partitionColumnLabel = 
optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+      dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+    }
+
+    @Override
+    public PartitionDescriptor getPartitionDescriptor(PlannerSettings 
settings, TableScan scanRel) {
+      return new FileSystemPartitionDescriptor(settings, scanRel);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      Aggregate aggregate = call.rel(0);
+      TableScan scan = call.rel(1);
+
+      if (!isQualifiedFilePruning(scan)
+          || scan.getRowType().getFieldCount() != 
aggregate.getRowType().getFieldCount()) {
+        return false;
+      }
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      // Check if select contains partition columns (dir0, dir1, dir2,..., 
dirN) only
+      for (String field : fieldNames) {
+        if (!dirPattern.matcher(field).matches()) {
+          return false;
+        }
+      }
+
+      return scan.isDistinct() || aggregate.getGroupCount() > 0;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      TableScan scan = call.rel(1);
+
+      String pruningClassName = getClass().getName();
+      logger.debug("Beginning file partition pruning, pruning class: {}", 
pruningClassName);
+      Stopwatch totalPruningTime = logger.isDebugEnabled() ? 
Stopwatch.createStarted() : null;
+
+      Object selection = getDrillTable(scan).getSelection();
+      MetadataContext metaContext = null;
+      FileSelection fileSelection = null;
+      if (selection instanceof FormatSelection) {
+        fileSelection = ((FormatSelection) selection).getSelection();
+        metaContext = fileSelection.getMetaContext();
+      }
+
+      PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
+      PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      List<String> values = Collections.emptyList();
+      List<Integer> indexes = new ArrayList<>(fieldNames.size());
+      for (String field : fieldNames) {
+        int index = descriptor.getPartitionHierarchyIndex(field);
+        indexes.add(index);
+      }
+      if (metaContext != null && metaContext.getDirectories() != null) {
+        // Dir metadata cache file exists
+        logger.debug("Using Metadata Directories cache");
+        values = getValues(fileSelection.getSelectionRoot(), 
metaContext.getDirectories(), indexes);
+      }
+
+      if (values.isEmpty()) {
+        logger.debug("Not using Metadata Directories cache");
+        int batchIndex = 0;
+        // Outer loop: iterate over a list of batches of PartitionLocations
+        values = new ArrayList<>();
+        for (List<PartitionLocation> partitions : descriptor) {
+          logger.debug("Evaluating file partition pruning for batch {}", 
batchIndex);
+
+          try {
+            values.addAll(getValues(partitions, indexes));
+          } catch (Exception e) {
+            logger.warn("Exception while trying to prune files.", e);
+            if (totalPruningTime != null) {
+              logger.debug("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            }
+
+            // continue without partition pruning
+            return;
+          }
+          batchIndex++;
+        }
+
+        if (values.isEmpty()) {
+          // No changes are required
+          return;
+        }
+      }
+
+      try {
+        List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+        RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+        int i = 0;
+        for (String field : fieldNames) {
+          RelDataType dataType = typeFactory.createTypeWithNullability(
+              typeFactory.createSqlType(SqlTypeName.VARCHAR, 
Types.MAX_VARCHAR_LENGTH), true);
+          typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+        }
+        RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), 
typeFields);
+        RelNode newInput = 
DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+            .values(t, values.toArray())
+            .build();
+
+        RelTraitSet traits = 
newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+        newInput = new DrillValuesRel(
+            newInput.getCluster(),
+            newInput.getRowType(),
+            ((LogicalValues) newInput).getTuples(), traits
+        );
+
+        Aggregate aggregate = call.rel(0);
+        Aggregate newAggregate = aggregate.copy(
+            aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+            newInput,
+            aggregate.indicator,
+            aggregate.getGroupSet(),
+            aggregate.getGroupSets(),
+            aggregate.getAggCallList()
+        );
+        call.transformTo(newAggregate);
+      } catch (Exception e) {
+        logger.warn("Exception while using the pruned partitions.", e);
+      } finally {
+        if (totalPruningTime != null) {
+          logger.debug("Total pruning elapsed time: {} ms", 
totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        }
+      }
+    }
+
+    private List<String> getValues(String selectionRoot, List<String> 
directories, List<Integer> indexes) {
+      List<String> values = new ArrayList<>();
+      for (String dir : directories) {
+        List<String> parts = ColumnExplorer.listPartitionValuesDirectory(dir, 
selectionRoot);
+        for (int index : indexes) {
+          if (index < parts.size()) {
+            values.add(parts.get(index));
+          } else {
+            values.add(null);
 
 Review comment:
   Why do we add null here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to