This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 0928427 DRILL-7418: MetadataDirectGroupScan improvements
0928427 is described below
commit 09284270b85d35f7a1d521539c42876bb613a6e3
Author: Arina Ielchiieva <[email protected]>
AuthorDate: Tue Oct 22 17:33:04 2019 +0300
DRILL-7418: MetadataDirectGroupScan improvements
1. Replaced files listing with selection root information to reduce query
plan size in MetadataDirectGroupScan.
2. Fixed MetadataDirectGroupScan ser / de issues.
3. Added PlanMatcher to QueryBuilder for more convenient plan matching.
4. Re-written TestConvertCountToDirectScan to use ClusterTest.
5. Refactoring and code clean up.
---
.../exec/TestHiveDrillNativeParquetReader.java | 19 +-
.../exec/physical/base/AbstractGroupScan.java | 5 +
.../apache/drill/exec/physical/base/GroupScan.java | 9 +-
.../apache/drill/exec/physical/base/ScanStats.java | 46 ++-
.../logical/ConvertCountToDirectScanRule.java | 27 +-
.../physical/ConvertCountToDirectScanPrule.java | 17 +-
.../drill/exec/store/direct/DirectGroupScan.java | 24 +-
.../exec/store/direct/MetadataDirectGroupScan.java | 49 ++-
.../parquet/AbstractParquetScanBatchCreator.java | 6 +-
.../drill/TestFunctionsWithTypeExpoQueries.java | 2 +-
.../logical/TestConvertCountToDirectScan.java | 444 ++++++++++++---------
.../java/org/apache/drill/test/ClientFixture.java | 76 ++--
.../java/org/apache/drill/test/QueryBuilder.java | 152 +++++--
13 files changed, 530 insertions(+), 346 deletions(-)
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 5490640..6b9a7cd 100644
---
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -64,8 +64,7 @@ public class TestHiveDrillNativeParquetReader extends
HiveTestBase {
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2,
actualRowCount);
- testPlanMatchingPatterns(query,
- new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan",
"numFiles=1");
}
@Test
@@ -75,8 +74,7 @@ public class TestHiveDrillNativeParquetReader extends
HiveTestBase {
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 1,
actualRowCount);
- testPlanMatchingPatterns(query,
- new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan",
"numFiles=1");
}
@Test
@@ -114,15 +112,14 @@ public class TestHiveDrillNativeParquetReader extends
HiveTestBase {
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2,
actualRowCount);
- testPlanMatchingPatterns(query,
- new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan",
"numFiles=1");
}
@Test
public void testPartitionedExternalTable() throws Exception {
String query = "select * from hive.kv_native_ext";
- testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan",
"numFiles=2"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan",
"numFiles=2");
testBuilder()
.sqlQuery(query)
@@ -185,14 +182,16 @@ public class TestHiveDrillNativeParquetReader extends
HiveTestBase {
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2,
actualRowCount);
- testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan",
"numFiles=1"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan",
"numFiles=1");
}
@Test
public void testConvertCountToDirectScanOptimization() throws Exception {
String query = "select count(1) as cnt from hive.kv_native";
- testPlanMatchingPatterns(query, new String[]{"DynamicPojoRecordReader"},
null);
+ testPlanMatchingPatterns(query, "DynamicPojoRecordReader");
+
+ testPhysicalPlanExecutionBasedOnQuery(query);
testBuilder()
.sqlQuery(query)
@@ -224,7 +223,7 @@ public class TestHiveDrillNativeParquetReader extends
HiveTestBase {
public void testReadAllSupportedHiveDataTypesNativeParquet() throws
Exception {
String query = "select * from hive.readtest_parquet";
- testPlanMatchingPatterns(query, new String[]
{"HiveDrillNativeParquetScan"}, null);
+ testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan");
testBuilder()
.sqlQuery(query)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 4916370..bc21550 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -174,6 +174,11 @@ public abstract class AbstractGroupScan extends
AbstractBase implements GroupSca
}
@Override
+ public Path getSelectionRoot() {
+ return null;
+ }
+
+ @Override
public Collection<Path> getFiles() {
return null;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 1ba1dd9..ebbb717 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -141,8 +141,15 @@ public interface GroupScan extends Scan, HasAffinity {
boolean hasFiles();
/**
+ * Returns path to the selection root. If this GroupScan cannot provide
selection root, it returns null.
+ *
+ * @return path to the selection root
+ */
+ Path getSelectionRoot();
+
+ /**
* Returns a collection of file names associated with this GroupScan. This
should be called after checking
- * hasFiles(). If this GroupScan cannot provide file names, it returns null.
+ * hasFiles(). If this GroupScan cannot provide file names, it returns null.
*
* @return collection of files paths
*/
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
index 721f723..596dc5b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/ScanStats.java
@@ -17,22 +17,44 @@
*/
package org.apache.drill.exec.physical.base;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
public class ScanStats {
public static final ScanStats TRIVIAL_TABLE = new
ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1);
public static final ScanStats ZERO_RECORD_TABLE = new
ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 0, 1, 1);
+ @JsonProperty
+ private final GroupScanProperty groupScanProperty;
+ @JsonProperty
private final double recordCount;
+ @JsonProperty
private final double cpuCost;
+ @JsonProperty
private final double diskCost;
- private final GroupScanProperty property;
- public ScanStats(GroupScanProperty property, double recordCount, double
cpuCost, double diskCost) {
+ @JsonCreator
+ public ScanStats(@JsonProperty("groupScanProperty") GroupScanProperty
groupScanProperty,
+ @JsonProperty("recordCount") double recordCount,
+ @JsonProperty("cpuCost") double cpuCost,
+ @JsonProperty("diskCost") double diskCost) {
+ this.groupScanProperty = groupScanProperty;
this.recordCount = recordCount;
this.cpuCost = cpuCost;
this.diskCost = diskCost;
- this.property = property;
+ }
+
+ /**
+ * Return if GroupScan knows the exact row count in the result of getSize()
call.
+ * By default, group scan does not know the exact row count, before it scans
every rows.
+ * Currently, parquet group scan will return the exact row count.
+ *
+ * @return group scan property
+ */
+ public GroupScanProperty getGroupScanProperty() {
+ return groupScanProperty;
}
public double getRecordCount() {
@@ -49,20 +71,14 @@ public class ScanStats {
@Override
public String toString() {
- return "ScanStats{" + "recordCount=" + recordCount + ", cpuCost=" +
cpuCost + ", diskCost=" + diskCost + ", property=" + property + '}';
+ return "ScanStats{" +
+ "recordCount=" + recordCount +
+ ", cpuCost=" + cpuCost +
+ ", diskCost=" + diskCost +
+ ", groupScanProperty=" + groupScanProperty +
+ '}';
}
- /**
- * Return if GroupScan knows the exact row count in the result of getSize()
call.
- * By default, groupscan does not know the exact row count, before it scans
every rows.
- * Currently, parquet group scan will return the exact row count.
- */
- public GroupScanProperty getGroupScanProperty() {
- return property;
- }
-
-
-
public enum GroupScanProperty {
NO_EXACT_ROW_COUNT(false, false),
EXACT_ROW_COUNT(true, true);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
index fb1bd2f..28c23f0 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
@@ -48,14 +48,14 @@ import
org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
@@ -99,9 +99,9 @@ public class ConvertCountToDirectScanRule extends RelOptRule {
RelOptHelper.some(Aggregate.class,
RelOptHelper.any(TableScan.class)), "Agg_on_scan");
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
- protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
+ private ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
super(rule, "ConvertCountToDirectScanRule:" + id);
}
@@ -153,7 +153,7 @@ public class ConvertCountToDirectScanRule extends
RelOptRule {
Metadata_V4.MetadataSummary metadataSummary = status.getRight();
Map<String, Long> result = collectCounts(settings, metadataSummary, agg,
scan, project);
- logger.trace("Calculated the following aggregate counts: ", result);
+ logger.trace("Calculated the following aggregate counts: {}", result);
// if counts could not be determined, rule won't be applied
if (result.isEmpty()) {
@@ -161,17 +161,16 @@ public class ConvertCountToDirectScanRule extends
RelOptRule {
return;
}
- List<Path> fileList =
-
ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()));
+ Path summaryFileName =
Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot());
final RelDataType scanRowType =
CountToDirectScanUtils.constructDataType(agg, result.keySet());
final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
- Collections.singletonList((List<Long>) new
ArrayList<>(result.values())));
+ Collections.singletonList(new ArrayList<>(result.values())));
final ScanStats scanStats = new
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1,
scanRowType.getFieldCount());
- final MetadataDirectGroupScan directScan = new
MetadataDirectGroupScan(reader, fileList, scanStats, true);
+ final MetadataDirectGroupScan directScan = new
MetadataDirectGroupScan(reader, summaryFileName, 1, scanStats, true);
final DrillDirectScanRel newScan = new
DrillDirectScanRel(scan.getCluster(),
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
directScan, scanRowType);
@@ -190,16 +189,16 @@ public class ConvertCountToDirectScanRule extends
RelOptRule {
if (!((formatConfig instanceof ParquetFormatConfig)
|| ((formatConfig instanceof NamedFormatPluginConfig)
&& ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
- return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false,
null);
+ return new ImmutablePair<>(false, null);
}
FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
- DrillFileSystem fs = null;
+ DrillFileSystem fs;
try {
fs = new
DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
} catch (IOException e) {
logger.warn("Unable to create the file system object for retrieving
statistics from metadata cache file ", e);
- return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false,
null);
+ return new ImmutablePair<>(false, null);
}
// check if the cacheFileRoot has been set: this is needed because after
directory pruning, the
@@ -215,8 +214,7 @@ public class ConvertCountToDirectScanRule extends
RelOptRule {
Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs,
selectionRoot, false, parquetReaderConfig);
- return metadataSummary != null ? new ImmutablePair<Boolean,
Metadata_V4.MetadataSummary>(true, metadataSummary) :
- new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+ return metadataSummary != null ? new ImmutablePair<>(true,
metadataSummary) : new ImmutablePair<>(false, null);
}
/**
@@ -311,5 +309,4 @@ public class ConvertCountToDirectScanRule extends
RelOptRule {
return ImmutableMap.copyOf(result);
}
-
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
index ac8f3ca..0900ba7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.util.List;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -44,6 +43,8 @@ import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>
@@ -89,9 +90,9 @@ public class ConvertCountToDirectScanPrule extends Prule {
RelOptHelper.some(DrillAggregateRel.class,
RelOptHelper.any(DrillScanRel.class)),
"Agg_on_scan");
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class);
+ private static final Logger logger =
LoggerFactory.getLogger(ConvertCountToDirectScanPrule.class);
- protected ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) {
+ private ConvertCountToDirectScanPrule(RelOptRuleOperand rule, String id) {
super(rule, "ConvertCountToDirectScanPrule:" + id);
}
@@ -125,10 +126,11 @@ public class ConvertCountToDirectScanPrule extends Prule {
final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
- Collections.singletonList((List<Long>) new
ArrayList<>(result.values())));
+ Collections.singletonList(new ArrayList<>(result.values())));
final ScanStats scanStats = new
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1,
scanRowType.getFieldCount());
- final GroupScan directScan = new MetadataDirectGroupScan(reader,
oldGrpScan.getFiles(), scanStats, false);
+ final int numFiles = oldGrpScan.hasFiles() ? oldGrpScan.getFiles().size()
: -1;
+ final GroupScan directScan = new MetadataDirectGroupScan(reader,
oldGrpScan.getSelectionRoot(), numFiles, scanStats, false);
final DirectScanPrel newScan = DirectScanPrel.create(scan,
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL)
.plus(DrillDistributionTrait.SINGLETON), directScan, scanRowType);
@@ -145,7 +147,7 @@ public class ConvertCountToDirectScanPrule extends Prule {
*
* For each aggregate call will determine if count can be calculated.
Collects counts only for COUNT function.
* For star, not null expressions and implicit columns sets count to total
record number.
- * For other cases obtains counts from group scan operator. Also count can
not be calculated for parition columns.
+ * For other cases obtains counts from group scan operator. Also count can
not be calculated for partition columns.
*
* @param agg aggregate relational expression
* @param scan scan relational expression
@@ -155,7 +157,7 @@ public class ConvertCountToDirectScanPrule extends Prule {
private Map<String, Long> collectCounts(PlannerSettings settings,
DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) {
final Set<String> implicitColumnsNames =
ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
final GroupScan oldGrpScan = scan.getGroupScan();
- final long totalRecordCount =
(long)oldGrpScan.getScanStats(settings).getRecordCount();
+ final long totalRecordCount = (long)
oldGrpScan.getScanStats(settings).getRecordCount();
final LinkedHashMap<String, Long> result = new LinkedHashMap<>();
for (int i = 0; i < agg.getAggCallList().size(); i++) {
@@ -218,5 +220,4 @@ public class ConvertCountToDirectScanPrule extends Prule {
return ImmutableMap.copyOf(result);
}
-
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 67b2e5c..6c49943 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -17,10 +17,11 @@
*/
package org.apache.drill.exec.store.direct;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -34,26 +35,30 @@ import java.util.List;
@JsonTypeName("direct-scan")
public class DirectGroupScan extends AbstractGroupScan {
+ @JsonProperty
protected final RecordReader reader;
+ @JsonProperty
protected final ScanStats stats;
public DirectGroupScan(RecordReader reader) {
this(reader, ScanStats.TRIVIAL_TABLE);
}
- public DirectGroupScan(RecordReader reader, ScanStats stats) {
+ @JsonCreator
+ public DirectGroupScan(@JsonProperty("reader") RecordReader reader,
+ @JsonProperty("stats") ScanStats stats) {
super((String) null);
this.reader = reader;
this.stats = stats;
}
@Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) throws
PhysicalOperatorSetupException {
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
assert endpoints.size() == 1;
}
@Override
- public SubScan getSpecificScan(int minorFragmentId) throws
ExecutionSetupException {
+ public SubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId == 0;
return new DirectSubScan(reader);
}
@@ -68,8 +73,14 @@ public class DirectGroupScan extends AbstractGroupScan {
return stats;
}
+ @JsonIgnore
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
throws ExecutionSetupException {
+ public List<SchemaPath> getColumns() {
+ return super.getColumns();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
assert children == null || children.isEmpty();
return new DirectGroupScan(reader, stats);
}
@@ -83,5 +94,4 @@ public class DirectGroupScan extends AbstractGroupScan {
public GroupScan clone(List<SchemaPath> columns) {
return this;
}
-
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
index 4fea456..da63f80 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.direct;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
@@ -25,7 +27,6 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.store.RecordReader;
import org.apache.hadoop.fs.Path;
-import java.util.Collection;
import java.util.List;
/**
@@ -37,20 +38,34 @@ import java.util.List;
@JsonTypeName("metadata-direct-scan")
public class MetadataDirectGroupScan extends DirectGroupScan {
- private final Collection<Path> files;
- private boolean usedMetadataSummaryFile = false;
+ @JsonProperty
+ private final Path selectionRoot;
+ @JsonProperty
+ private final int numFiles;
+ @JsonProperty
+ private boolean usedMetadataSummaryFile;
- public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files,
ScanStats stats,
- boolean usedMetadataSummaryFile) {
+ @JsonCreator
+ public MetadataDirectGroupScan(@JsonProperty("reader") RecordReader reader,
+ @JsonProperty("selectionRoot") Path
selectionRoot,
+ @JsonProperty("numFiles") int numFiles,
+ @JsonProperty("stats") ScanStats stats,
+ @JsonProperty("usedMetadataSummaryFile")
boolean usedMetadataSummaryFile) {
super(reader, stats);
- this.files = files;
+ this.selectionRoot = selectionRoot;
+ this.numFiles = numFiles;
this.usedMetadataSummaryFile = usedMetadataSummaryFile;
}
@Override
+ public Path getSelectionRoot() {
+ return selectionRoot;
+ }
+
+ @Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
assert children == null || children.isEmpty();
- return new MetadataDirectGroupScan(reader, files, stats,
usedMetadataSummaryFile);
+ return new MetadataDirectGroupScan(reader, selectionRoot, numFiles, stats,
usedMetadataSummaryFile);
}
@Override
@@ -61,25 +76,25 @@ public class MetadataDirectGroupScan extends
DirectGroupScan {
/**
* <p>
* Returns string representation of group scan data.
- * Includes list of files if present.
+ * Includes selection root, number of files, if metadata summary file was
used,
+ * such data is present.
* </p>
*
* <p>
- * Example: [files = [/tmp/0_0_0.parquet], numFiles = 1]
+ * Example: [selectionRoot = [/tmp/users], numFiles = 1,
usedMetadataSummaryFile = false]
* </p>
*
* @return string representation of group scan data
*/
@Override
public String getDigest() {
- if (files != null) {
- StringBuilder builder = new StringBuilder();
- builder.append("files = ").append(files).append(", ");
- builder.append("numFiles = ").append(files.size()).append(", ");
- builder.append("usedMetadataSummaryFile =
").append(usedMetadataSummaryFile).append(", ");
- return builder.append(super.getDigest()).toString();
+ StringBuilder builder = new StringBuilder();
+ if (selectionRoot != null) {
+ builder.append("selectionRoot = ").append(selectionRoot).append(", ");
}
- return super.getDigest();
+ builder.append("numFiles = ").append(numFiles).append(", ");
+ builder.append("usedMetadataSummaryFile =
").append(usedMetadataSummaryFile).append(", ");
+ builder.append(super.getDigest());
+ return builder.toString();
}
-
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index eae151e..838180d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -273,8 +273,8 @@ public abstract class AbstractParquetScanBatchCreator {
* @param rowGroup create a reader for this specific row group
* @param fs file system
* @param footer this file's footer
- * // @param readSchemaOnly - if true sets the number of rows to read to be
zero
- * @return the (possibly modified) input mapWithMaxColumns
+ * @param readSchemaOnly if true sets the number of rows to read to be zero
+ * @return the (possibly modified) input mapWithMaxColumns
*/
private Map<String, String>
createReaderAndImplicitColumns(ExecutorFragmentContext context,
AbstractParquetRowGroupScan rowGroupScan,
@@ -347,7 +347,7 @@ public abstract class AbstractParquetScanBatchCreator {
/**
* Helper class responsible for creating and managing DrillFileSystem.
*/
- protected abstract class AbstractDrillFileSystemManager {
+ protected static abstract class AbstractDrillFileSystemManager {
protected final OperatorContext operatorContext;
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
index 207638d..0c892da 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java
@@ -277,7 +277,7 @@ public class TestFunctionsWithTypeExpoQueries extends
BaseTestQuery {
"where concat(a, 'asdf') = 'asdf'";
// Validate the plan
- final String[] expectedPlan = {"Scan.*a.parquet.*numFiles = 1"};
+ final String[] expectedPlan = {"Scan.*metadata_caching.*numFiles = 1"};
final String[] excludedPlan = {"Filter"};
PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
index c35ab2d..ae33f0a 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
@@ -17,364 +17,426 @@
*/
package org.apache.drill.exec.planner.logical;
-import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.PlannerTest;
+import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.nio.file.Paths;
-@Category(PlannerTest.class)
-public class TestConvertCountToDirectScan extends PlanTestBase {
+import static org.junit.Assert.assertEquals;
+
+@Category({PlannerTest.class, UnlikelyTest.class})
+public class TestConvertCountToDirectScan extends ClusterTest {
@BeforeClass
- public static void setupTestFiles() {
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
dirTestWatcher.copyResourceToRoot(Paths.get("directcount.parquet"));
+ startCluster(builder);
}
@Test
- public void ensureCaseDoesNotConvertToDirectScan() throws Exception {
- testPlanMatchingPatterns(
- "select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then
n_nationkey else null end) as cnt\n" +
- "from dfs.`directcount.parquet`", new String[]{"CASE"});
+ public void testCaseDoesNotConvertToDirectScan() throws Exception {
+ queryBuilder()
+ .sql("select " +
+ "count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey
else null end) as cnt " +
+ "from dfs.`directcount.parquet`")
+ .planMatcher()
+ .include("CASE")
+ .match();
}
@Test
- public void ensureConvertSimpleCountToDirectScan() throws Exception {
+ public void testConvertSimpleCountToDirectScan() throws Exception {
String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("cnt")
- .baselineValues(25L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(25L)
+ .go();
}
@Test
- public void ensureConvertSimpleCountConstToDirectScan() throws Exception {
+ public void testConvertSimpleCountConstToDirectScan() throws Exception {
String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("cnt")
- .baselineValues(25L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(25L)
+ .go();
}
@Test
- public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception
{
+ public void testConvertSimpleCountConstExprToDirectScan() throws Exception {
String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("cnt")
- .baselineValues(25L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(25L)
+ .go();
}
@Test
- public void ensureDoesNotConvertForDirectoryColumns() throws Exception {
+ public void testDoesNotConvertForDirectoryColumns() throws Exception {
String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(sql, new String[]{"ParquetGroupScan"});
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("ParquetGroupScan")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("cnt")
- .baselineValues(0L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(0L)
+ .go();
}
@Test
- public void ensureConvertForImplicitColumns() throws Exception {
+ public void testConvertForImplicitColumns() throws Exception {
String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("cnt")
- .baselineValues(25L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(25L)
+ .go();
}
@Test
public void ensureConvertForSeveralColumns() throws Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
String newFqnColumnName = "new_fqn";
- test("alter session set `%s` = '%s'",
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName);
- test("create table %s as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
- test("refresh table metadata %s", tableName);
+ client.alterSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL,
newFqnColumnName);
+ run("create table %s as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(%s) as implicit_count,\n" +
- "count(*) as star_count,\n" +
- "count(col_int) as int_column_count,\n" +
- "count(col_vrchr) as vrchr_column_count\n" +
- "from %s", newFqnColumnName, tableName);
-
- testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
+ "count(%s) as implicit_count,\n" +
+ "count(*) as star_count,\n" +
+ "count(col_int) as int_column_count,\n" +
+ "count(col_vrchr) as vrchr_column_count\n" +
+ "from %s", newFqnColumnName, tableName);
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("implicit_count", "star_count", "int_column_count",
"vrchr_column_count")
- .baselineValues(6L, 6L, 2L, 3L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("implicit_count", "star_count", "int_column_count",
"vrchr_column_count")
+ .baselineValues(6L, 6L, 2L, 3L)
+ .go();
} finally {
- test("alter session reset `%s`",
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL);
- test("drop table if exists %s", tableName);
+ client.resetSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL);
+ run("drop table if exists %s", tableName);
}
}
@Test
- public void ensureCorrectCountWithMissingStatistics() throws Exception {
- test("use dfs.tmp");
+ public void testCorrectCountWithMissingStatistics() throws Exception {
+ run("use dfs.tmp");
String tableName = "wide_str_table";
try {
// table will contain two partitions: one - with null value, second -
with non null value
- test("create table %s partition by (col_str) as select * from
cp.`parquet/wide_string.parquet`", tableName);
+ run("create table %s partition by (col_str) as select * from
cp.`parquet/wide_string.parquet`", tableName);
- String query = String.format("select count(col_str) as cnt_str, count(*)
as cnt_total from %s", tableName);
+ String sql = String.format("select count(col_str) as cnt_str, count(*)
as cnt_total from %s", tableName);
// direct scan should not be applied since we don't have statistics
- testPlanMatchingPatterns(query, null, new
String[]{"DynamicPojoRecordReader"});
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .exclude("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(query)
+ .sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt_str", "cnt_total")
.baselineValues(1L, 2L)
.go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithMetadataCacheSummary() throws Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
+
String tableName = "parquet_table_counts";
try {
- test(String.format("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
+ run("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
- test("refresh table metadata %s", tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(*) as star_count,\n" +
- "count(col_int) as int_column_count,\n" +
- "count(col_vrchr) as vrchr_column_count\n" +
- "from %s", tableName);
-
- int expectedNumFiles = 1;
- String numFilesPattern = "numFiles = " + expectedNumFiles;
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
- String recordReaderPattern = "DynamicPojoRecordReader";
-
- testPlanMatchingPatterns(sql, new String[]{numFilesPattern,
usedMetaSummaryPattern, recordReaderPattern});
+ "count(*) as star_count,\n" +
+ "count(col_int) as int_column_count,\n" +
+ "count(col_vrchr) as vrchr_column_count\n" +
+ "from %s", tableName);
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("numFiles = 1")
+ .include("usedMetadataSummaryFile = true")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count", "int_column_count",
"vrchr_column_count")
- .baselineValues(24L, 8L, 12L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count", "int_column_count",
"vrchr_column_count")
+ .baselineValues(24L, 8L, 12L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithMetadataCacheSummaryAndDirPruning() throws
Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
- test(String.format("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
+ run("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
- test("refresh table metadata %s", tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(*) as star_count,\n" +
- "count(col_int) as int_column_count,\n" +
- "count(col_vrchr) as vrchr_column_count\n" +
- "from %s where dir0 = 1 ", tableName);
-
- int expectedNumFiles = 1;
- String numFilesPattern = "numFiles = " + expectedNumFiles;
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
- String recordReaderPattern = "DynamicPojoRecordReader";
-
- testPlanMatchingPatterns(sql, new String[]{numFilesPattern,
usedMetaSummaryPattern, recordReaderPattern});
+ "count(*) as star_count,\n" +
+ "count(col_int) as int_column_count,\n" +
+ "count(col_vrchr) as vrchr_column_count\n" +
+ "from %s where dir0 = 1 ", tableName);
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("numFiles = 1")
+ .include("usedMetadataSummaryFile = true")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count", "int_column_count",
"vrchr_column_count")
- .baselineValues(6L, 2L, 3L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count", "int_column_count",
"vrchr_column_count")
+ .baselineValues(6L, 2L, 3L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithWildCard() throws Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
for (int i = 0; i < 10; i++) {
- test(String.format("create table `%s/12/%s` as select * from
cp.`tpch/nation.parquet`", tableName, i));
+ run("create table `%s/12/%s` as select * from
cp.`tpch/nation.parquet`", tableName, i);
}
- test(String.format("create table `%s/2` as select * from
cp.`tpch/nation.parquet`", tableName));
- test(String.format("create table `%s/2/11` as select * from
cp.`tpch/nation.parquet`", tableName));
- test(String.format("create table `%s/2/12` as select * from
cp.`tpch/nation.parquet`", tableName));
+ run("create table `%s/2` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/2/11` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/2/12` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("refresh table metadata %s", tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(*) as star_count\n" +
- "from `%s/1*`", tableName);
-
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = false";
- String recordReaderPattern = "DynamicPojoRecordReader";
+ "count(*) as star_count\n" +
+ "from `%s/1*`", tableName);
- testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern,
recordReaderPattern});
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("usedMetadataSummaryFile = false")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count")
- .baselineValues(250L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count")
+ .baselineValues(250L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsForLeafDirectories() throws Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
- test("create table `%s/1` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("create table `%s/2` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("create table `%s/3` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("refresh table metadata %s", tableName);
+ run("create table `%s/1` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/2` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/3` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(*) as star_count\n" +
- "from `%s/1`", tableName);
-
- int expectedNumFiles = 1;
- String numFilesPattern = "numFiles = " + expectedNumFiles;
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
- String recordReaderPattern = "DynamicPojoRecordReader";
+ "count(*) as star_count\n" +
+ "from `%s/1`", tableName);
- testPlanMatchingPatterns(sql, new String[]{numFilesPattern,
usedMetaSummaryPattern, recordReaderPattern});
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("numFiles = 1")
+ .include("usedMetadataSummaryFile = true")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count")
- .baselineValues(25L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count")
+ .baselineValues(25L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsForDirWithFilesAndDir() throws Exception {
- test("use dfs.tmp");
+ run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
- test("create table `%s/1` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`",
tableName);
- test("refresh table metadata %s", tableName);
+ run("create table `%s/1` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`",
tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select count(*) as star_count from `%s/1`",
tableName);
- int expectedNumFiles = 1;
- String numFilesPattern = "numFiles = " + expectedNumFiles;
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
- String recordReaderPattern = "DynamicPojoRecordReader";
-
- testPlanMatchingPatterns(sql, new String[]{numFilesPattern,
usedMetaSummaryPattern, recordReaderPattern});
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("numFiles = 1")
+ .include("usedMetadataSummaryFile = true")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count")
- .baselineValues(75L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count")
+ .baselineValues(75L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
@Test
- public void testCountsWithNonExColumn() throws Exception {
- test("use dfs.tmp");
+ public void testCountsWithNonExistingColumn() throws Exception {
+ run("use dfs.tmp");
String tableName = "parquet_table_counts_nonex";
try {
- test(String.format("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
- test(String.format("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName));
+ run("create table `%s/1` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/2` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/3` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
+ run("create table `%s/4` as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
- test("refresh table metadata %s", tableName);
+ run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
- "count(*) as star_count,\n" +
- "count(col_int) as int_column_count,\n" +
- "count(col_vrchr) as vrchr_column_count,\n" +
- "count(non_existent) as non_existent\n" +
- "from %s", tableName);
-
- String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
- String recordReaderPattern = "DynamicPojoRecordReader";
-
- testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern,
recordReaderPattern});
+ "count(*) as star_count,\n" +
+ "count(col_int) as int_column_count,\n" +
+ "count(col_vrchr) as vrchr_column_count,\n" +
+ "count(non_existent) as non_existent\n" +
+ "from %s", tableName);
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("numFiles = 1")
+ .include("usedMetadataSummaryFile = true")
+ .include("DynamicPojoRecordReader")
+ .match();
testBuilder()
- .sqlQuery(sql)
- .unOrdered()
- .baselineColumns("star_count", "int_column_count",
"vrchr_column_count", "non_existent" )
- .baselineValues(24L, 8L, 12L, 0L)
- .go();
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("star_count", "int_column_count",
"vrchr_column_count", "non_existent" )
+ .baselineValues(24L, 8L, 12L, 0L)
+ .go();
} finally {
- test("drop table if exists %s", tableName);
+ run("drop table if exists %s", tableName);
}
}
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match", 25L, cnt);
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 17b838c..bbf684e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -19,7 +19,6 @@ package org.apache.drill.test;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
@@ -43,6 +42,8 @@ import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.test.ClusterFixture.FixtureTestServices;
import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Represents a Drill client. Provides many useful test-specific operations
such
@@ -52,7 +53,8 @@ import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
*/
public class ClientFixture implements AutoCloseable {
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClientFixture.class);
public static class ClientBuilder {
@@ -119,11 +121,9 @@ public class ClientFixture implements AutoCloseable {
/**
* Set a runtime option.
*
- * @param key
- * @param value
- * @throws RpcException
+ * @param key option name
+ * @param value option value
*/
-
public void alterSession(String key, Object value) {
String sql = "ALTER SESSION SET `" + key + "` = " +
ClusterFixture.stringify(value);
runSqlSilently(sql);
@@ -136,9 +136,8 @@ public class ClientFixture implements AutoCloseable {
/**
* Reset a system option
- * @param key
+ * @param key option name
*/
-
public void resetSession(String key) {
runSqlSilently("ALTER SESSION RESET `" + key + "`");
}
@@ -148,19 +147,19 @@ public class ClientFixture implements AutoCloseable {
}
/**
- * Run SQL silently (discard results.)
+ * Run SQL silently (discard results).
*
- * @param sql
- * @throws RpcException
+ * @param sql query
+ * @param args format params
+ * @throws IllegalStateException if something goes wrong
*/
-
- public void runSqlSilently(String sql) {
+ public void runSqlSilently(String sql, Object... args) {
try {
- queryBuilder().sql(sql).run();
+ queryBuilder().sql(sql, args).run();
} catch (Exception e) {
// Should not fail during tests. Convert exception to unchecked
// to simplify test code.
- new IllegalStateException(e);
+ throw new IllegalStateException(e);
}
}
@@ -182,9 +181,11 @@ public class ClientFixture implements AutoCloseable {
/**
* Run zero or more queries and output the results in TSV format.
+ *
+ * @param queryString query string
+ * @param print if query result should be printed
*/
- private void runQueriesAndOutput(final String queryString,
- final boolean print) throws Exception {
+ private void runQueriesAndOutput(final String queryString, final boolean
print) {
final String query = QueryTestUtil.normalizeQuery(queryString);
String[] queries = query.split(";");
for (String q : queries) {
@@ -203,24 +204,30 @@ public class ClientFixture implements AutoCloseable {
/**
* Run zero or more queries and log the output in TSV format.
+ *
+ * @param queryString query string
*/
- public void runQueriesAndLog(final String queryString) throws Exception {
+ public void runQueriesAndLog(final String queryString) {
runQueriesAndOutput(queryString, false);
}
/**
* Run zero or more queries and print the output in TSV format.
+ *
+ * @param queryString query string
*/
- public void runQueriesAndPrint(final String queryString) throws Exception {
+ public void runQueriesAndPrint(final String queryString) {
runQueriesAndOutput(queryString, true);
}
/**
* Plan a query without execution.
- * @throws ExecutionException
- * @throws InterruptedException
+ *
+ * @param type query type
+ * @param query query string
+ * @param isSplitPlan option to tell whether to return single or split plans
for a query
+ * @return query plan fragments
*/
-
public QueryPlanFragments planQuery(QueryType type, String query, boolean
isSplitPlan) {
DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures =
client.planQuery(type, query, isSplitPlan);
try {
@@ -251,11 +258,10 @@ public class ClientFixture implements AutoCloseable {
* Return a parsed query profile for a query summary. Saving of profiles
* must be turned on.
*
- * @param summary
- * @return
- * @throws IOException
+ * @param summary query summary
+ * @return profile parser
+ * @throws IOException if unable to parse query profile
*/
-
public ProfileParser parseProfile(QuerySummary summary) throws IOException {
return parseProfile(summary.queryIdString());
}
@@ -264,9 +270,11 @@ public class ClientFixture implements AutoCloseable {
* Parse a query profile from the local storage location given the
* query ID. Saving of profiles must be turned on. This is a bit of
* a hack: the profile should be available directly from the server.
- * @throws IOException
+ *
+ * @param queryId query ID
+ * @return profile parser
+ * @throws IOException if unable to parse the profile
*/
-
public ProfileParser parseProfile(String queryId) throws IOException {
File file = new File(cluster.getProfileDir(), queryId + ".sys.drill");
return new ProfileParser(file);
@@ -281,7 +289,6 @@ public class ClientFixture implements AutoCloseable {
* @param controls the controls string created by
* {@link org.apache.drill.exec.testing.Controls#newBuilder()} builder.
*/
-
public void setControls(String controls) {
ControlsInjectionUtil.validateControlsString(controls);
alterSession(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
@@ -309,10 +316,8 @@ public class ClientFixture implements AutoCloseable {
* <li><tt>ALTER SESSION SET `foo` = ";"</tt></li>
* <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li>
*/
-
public static class StatementParser {
private final Reader in;
- private StringBuilder buf;
public StatementParser(Reader in) {
this.in = in;
@@ -320,7 +325,7 @@ public class ClientFixture implements AutoCloseable {
public String parseNext() throws IOException {
boolean eof = false;
- buf = new StringBuilder();
+ StringBuilder buf = new StringBuilder();
for (;;) {
int c = in.read();
if (c == -1) {
@@ -377,11 +382,12 @@ public class ClientFixture implements AutoCloseable {
/**
* Execute a set of statements from a file.
+ *
* @param source the set of statements, separated by semicolons
* @return the number of statements executed
+ * @throws IOException if anable to execute statements from file
*/
-
- public int exec(File source) throws FileNotFoundException, IOException {
+ public int exec(File source) throws IOException {
try (Reader in = new BufferedReader(new FileReader(source))) {
return exec(in);
}
@@ -389,10 +395,10 @@ public class ClientFixture implements AutoCloseable {
/**
* Execute a set of statements from a string.
+ *
* @param stmts the set of statements, separated by semicolons
* @return the number of statements executed
*/
-
public int exec(String stmts) {
try (Reader in = new StringReader(stmts)) {
return exec(in);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 94f5d85..6895d93 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -19,17 +19,17 @@ package org.apache.drill.test;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.drill.PlanTestBase;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.expression.SchemaPath;
@@ -61,6 +61,8 @@ import
org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
import org.apache.drill.test.ClientFixture.StatementParser;
import org.joda.time.Period;
+import static org.junit.Assert.assertEquals;
+
/**
* Builder for a Drill query. Provides all types of query formats,
* and a variety of ways to run the query.
@@ -73,7 +75,7 @@ public class QueryBuilder {
* using a {@link QuerySummaryFuture}.
*/
- public class SummaryOnlyQueryEventListener implements UserResultsListener {
+ public static class SummaryOnlyQueryEventListener implements
UserResultsListener {
/**
* The future to be notified. Created here and returned by the
@@ -123,7 +125,7 @@ public class QueryBuilder {
* just the summary of the query.
*/
- public class QuerySummaryFuture implements Future<QuerySummary> {
+ public static class QuerySummaryFuture implements Future<QuerySummary> {
/**
* Synchronizes the listener thread and the test thread that
@@ -153,7 +155,7 @@ public class QueryBuilder {
public boolean isDone() { return summary != null; }
@Override
- public QuerySummary get() throws InterruptedException, ExecutionException {
+ public QuerySummary get() throws InterruptedException {
lock.await();
return summary;
}
@@ -163,8 +165,7 @@ public class QueryBuilder {
*/
@Override
- public QuerySummary get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
+ public QuerySummary get(long timeout, TimeUnit unit) throws
InterruptedException {
return get();
}
@@ -177,7 +178,6 @@ public class QueryBuilder {
/**
* Summary results of a query: records, batches, run time.
*/
-
public static class QuerySummary {
private final QueryId queryId;
private final int records;
@@ -261,7 +261,6 @@ public class QueryBuilder {
* @param planFragments fragments that make up the plan
* @return this builder
*/
-
public QueryBuilder plan(List<PlanFragment> planFragments) {
queryType = QueryType.EXECUTION;
this.planFragments = planFragments;
@@ -275,8 +274,7 @@ public class QueryBuilder {
* optional ending semi-colon
* @return this builder
*/
-
- public QueryBuilder sql(File file) throws FileNotFoundException, IOException
{
+ public QueryBuilder sql(File file) throws IOException {
try (BufferedReader in = new BufferedReader(new FileReader(file))) {
StatementParser parser = new StatementParser(in);
String sql = parser.parseNext();
@@ -297,7 +295,6 @@ public class QueryBuilder {
* @param resource Name of the resource
* @return this builder
*/
-
public QueryBuilder sqlResource(String resource) {
sql(ClusterFixture.loadResource(resource));
return this;
@@ -321,7 +318,6 @@ public class QueryBuilder {
* @return the query summary
* @throws Exception if anything goes wrong anywhere in the execution
*/
-
public QuerySummary run() throws Exception {
return produceSummary(withEventListener());
}
@@ -330,9 +326,8 @@ public class QueryBuilder {
* Run the query and return a list of the result batches. Use
* if the batch count is small and you want to work with them.
* @return a list of batches resulting from the query
- * @throws RpcException
+ * @throws RpcException if anything goes wrong
*/
-
public List<QueryDataBatch> results() throws RpcException {
Preconditions.checkNotNull(queryType, "Query not provided.");
Preconditions.checkNotNull(queryText, "Query not provided.");
@@ -345,13 +340,12 @@ public class QueryBuilder {
* by the code using a {@link RowSetReader}.
* <p>
*
- * @see {@link #rowSetIterator()} for a version that reads a series of
+ * @see #rowSetIterator() for a version that reads a series of
* batches as row sets.
* @return a row set that represents the first non-empty batch returned from
* the query
* @throws RpcException if anything goes wrong
*/
-
public DirectRowSet rowSet() throws RpcException {
// Ignore all but the first non-empty batch.
@@ -435,7 +429,6 @@ public class QueryBuilder {
* @param <V> vector class
* @param <T> return type
* @return result produced by {@code reader} lambda or {@code null} if no
records returned from the query
- *
*/
@SuppressWarnings("unchecked")
public <T, V> T vectorValue(String columnName, Class<V> vectorClass,
VectorQueryReader<T, V> reader)
@@ -543,7 +536,6 @@ public class QueryBuilder {
* @return the value of the first column of the first row
* @throws RpcException if anything goes wrong
*/
-
public String singletonString() throws RpcException {
return singletonGeneric(ScalarReader::getString);
}
@@ -554,7 +546,6 @@ public class QueryBuilder {
*
* @param listener the Drill listener
*/
-
public void withListener(UserResultsListener listener) {
Preconditions.checkNotNull(queryType, "Query not provided.");
if (planFragments != null) {
@@ -578,7 +569,6 @@ public class QueryBuilder {
*
* @return the query event listener
*/
-
public BufferingQueryEventListener withEventListener() {
BufferingQueryEventListener listener = new BufferingQueryEventListener();
withListener(listener);
@@ -598,15 +588,12 @@ public class QueryBuilder {
}
/**
- * <p>
- * Run a query and logs the output in TSV format.
- * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
- * </p>
+ * Run a query and logs the output in TSV format.
+ * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
*
* @return The number of rows returned.
- * @throws Exception If anything goes wrong with query execution.
*/
- public long log() throws Exception {
+ public long log() {
return log(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
}
@@ -623,15 +610,12 @@ public class QueryBuilder {
}
/**
- * <p>
- * Runs a query and prints the output to stdout in TSV format.
- * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
- * </p>
+ * Runs a query and prints the output to stdout in TSV format.
+ * Similar to {@link QueryTestUtil#testRunAndLog} with one query.
*
* @return The number of rows returned.
- * @throws Exception If anything goes wrong with query execution.
*/
- public long print() throws Exception {
+ public long print() {
return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
}
@@ -639,8 +623,9 @@ public class QueryBuilder {
* Run the query asynchronously, returning a future to be used
* to check for query completion, wait for completion, and obtain
* the result summary.
+ *
+ * @return query summary future
*/
-
public QuerySummaryFuture futureSummary() {
QuerySummaryFuture future = new QuerySummaryFuture();
withListener(new SummaryOnlyQueryEventListener(future));
@@ -663,9 +648,10 @@ public class QueryBuilder {
/**
* Submit an "EXPLAIN" statement, and return text form of the
* plan.
+ *
+ * @return explain plan
* @throws Exception if the query fails
*/
-
public String explainText() throws Exception {
return explain(ClusterFixture.EXPLAIN_PLAN_TEXT);
}
@@ -673,9 +659,10 @@ public class QueryBuilder {
/**
* Submit an "EXPLAIN" statement, and return the JSON form of the
* plan.
+ *
+ * @return explain plan
* @throws Exception if the query fails
*/
-
public String explainJson() throws Exception {
return explain(ClusterFixture.EXPLAIN_PLAN_JSON);
}
@@ -685,12 +672,24 @@ public class QueryBuilder {
return queryPlan(format);
}
+ /**
+ * Submits explain plan statement
+ * and creates plan matcher instance based on return query plan.
+ *
+ * @return plan matcher
+ * @throws Exception if the query fails
+ */
+ public PlanMatcher planMatcher() throws Exception {
+ String plan = explainText();
+ return new PlanMatcher(plan);
+ }
+
private QuerySummary produceSummary(BufferingQueryEventListener listener)
throws Exception {
long start = System.currentTimeMillis();
int recordCount = 0;
int batchCount = 0;
QueryId queryId = null;
- QueryState state = null;
+ QueryState state;
loop:
for (;;) {
QueryEvent event = listener.get();
@@ -727,11 +726,13 @@ public class QueryBuilder {
* Submit an "EXPLAIN" statement, and return the column value which
* contains the plan's string.
* <p>
- * Cribbed from {@link PlanTestBase#getPlanInString(String, String)}
- * @throws Exception if anything goes wrogn in the query
+ * Cribbed from PlanTestBase#getPlanInString(String, String)
+ *
+ * @param columnName column name to extract from result
+ * @return query plan
+ * @throws Exception if anything goes wrong in the query
*/
-
- protected String queryPlan(String columnName) throws Exception {
+ private String queryPlan(String columnName) throws Exception {
Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain
an SQL query.");
final List<QueryDataBatch> results = results();
final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
@@ -764,4 +765,69 @@ public class QueryBuilder {
return builder.toString();
}
+
+ /**
+ * Collects expected and non-expected query patterns.
+ * Upon {@link #match()} method call, matches given patterns to the query
plan.
+ */
+ public static class PlanMatcher {
+
+ private static final String EXPECTED_NOT_FOUND = "Did not find expected
pattern";
+ private static final String UNEXPECTED_FOUND = "Found unwanted pattern";
+
+ private final String plan;
+ private final List<String> included = new ArrayList<>();
+ private final List<String> excluded = new ArrayList<>();
+
+ public PlanMatcher(String plan) {
+ this.plan = plan;
+ }
+
+ public PlanMatcher include(String... patterns) {
+ included.addAll(Arrays.asList(patterns));
+ return this;
+ }
+
+ public PlanMatcher exclude(String... patterns) {
+ excluded.addAll(Arrays.asList(patterns));
+ return this;
+ }
+
+ /**
+ * Checks if stored patterns (string parts) are included or excluded in
the given plan.
+ *
+ * <p/>
+ * Example: <br/>
+ * For the plan:
+ * <pre>
+ * 00-00 Screen
+ * 00-01 Project(cnt=[$0])
+ * 00-02 DirectScan(groupscan=[selectionRoot =
classpath:/tpch/nation.parquet,
+ * numFiles = 1, usedMetadataSummaryFile = false,
DynamicPojoRecordReader{records = [[25]]}])
+ * </pre>
+ *
+ * <ul>
+ * <li>To check that number of files are 1 and DynamicPojoRecordReader is
used:
+ * <code>planMatcher.include("numFiles = 1",
"DynamicPojoRecordReader")</code></li>
+ * <li>To check that metadata summary file was not used:
+ * <code>planMatcher.exclude("usedMetadataSummaryFile = true")</code></li>
+ * </ul>
+ *
+ * Calling <code>planMatcher.match()</code> method would check that given
patterns are present
+ * or absent in the given plan. Method execution will fail with {@link
AssertionError}
+ * only if expected pattern was not matched or unexpected pattern was
matched.
+ */
+ public void match() {
+ included.forEach(pattern -> match(pattern, true));
+ excluded.forEach(pattern -> match(pattern, false));
+ }
+
+ private void match(String patternString, boolean expectedResult) {
+ Pattern pattern = Pattern.compile(patternString);
+ Matcher matcher = pattern.matcher(plan);
+ String message = String.format("%s in plan: %s\n%s",
+ expectedResult ? EXPECTED_NOT_FOUND : UNEXPECTED_FOUND, patternString,
plan);
+ assertEquals(message, expectedResult, matcher.find());
+ }
+ }
}