This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 92fbed8 DRILL-6603: Set num_nulls for parquet statistics to -1 when
actual number is not defined.
92fbed8 is described below
commit 92fbed887ca4ca0f2208f367a8f86f8aa4940513
Author: Arina Ielchiieva <[email protected]>
AuthorDate: Tue Jul 17 16:41:48 2018 +0300
DRILL-6603: Set num_nulls for parquet statistics to -1 when actual number
is not defined.
---
.../exec/expr/stat/ParquetPredicatesHelper.java | 2 +-
.../store/parquet/ParquetGroupScanStatistics.java | 4 +-
.../exec/store/parquet/metadata/Metadata.java | 37 ++++------
.../exec/store/parquet/metadata/MetadataBase.java | 17 +++--
.../exec/store/parquet/metadata/Metadata_V3.java | 22 ++----
.../parquet/stat/ParquetMetaStatCollector.java | 13 ++--
.../logical/TestConvertCountToDirectScan.java | 76 +++++++++++----------
.../store/parquet/TestParquetFilterPushDown.java | 32 +++++++++
.../src/test/resources/parquet/wide_string.parquet | Bin 0 -> 2292 bytes
9 files changed, 109 insertions(+), 94 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index de4df1f..f02976b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -54,7 +54,7 @@ class ParquetPredicatesHelper {
* @return <tt>true</tt> if the parquet file does not have nulls and
<tt>false</tt> otherwise
*/
static boolean hasNoNulls(Statistics stat) {
- return stat.getNumNulls() <= 0;
+ return stat.getNumNulls() == 0;
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index 5a3e0c2..f7d5687 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -89,12 +89,12 @@ public class ParquetGroupScanStatistics {
SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
Long previousCount = columnValueCounts.get(schemaPath);
if (previousCount != null) {
- if (previousCount != GroupScan.NO_COLUMN_STATS && column.getNulls()
!= null) {
+ if (previousCount != GroupScan.NO_COLUMN_STATS &&
column.isNumNullsSet()) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath,
columnValueCounts.get(schemaPath) + newCount);
}
} else {
- if (column.getNulls() != null) {
+ if (column.isNumNullsSet()) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath, newCount);
} else {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 2259169..b92f647 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -64,7 +64,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -447,47 +446,40 @@ public class Metadata {
logger.debug(containsCorruptDates.toString());
}
for (BlockMetaData rowGroup : metadata.getBlocks()) {
- List<ColumnMetadata_v3> columnMetadataList = Lists.newArrayList();
+ List<ColumnMetadata_v3> columnMetadataList = new ArrayList<>();
long length = 0;
for (ColumnChunkMetaData col : rowGroup.getColumns()) {
- ColumnMetadata_v3 columnMetadata;
-
- boolean statsAvailable = (col.getStatistics() != null &&
!col.getStatistics().isEmpty());
-
Statistics<?> stats = col.getStatistics();
String[] columnName = col.getPath().toArray();
SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
ColumnTypeMetadata_v3 columnTypeMetadata =
- new ColumnTypeMetadata_v3(columnName, col.getType(),
colTypeInfo.originalType,
+ new ColumnTypeMetadata_v3(columnName,
col.getPrimitiveType().getPrimitiveTypeName(), colTypeInfo.originalType,
colTypeInfo.precision, colTypeInfo.scale,
colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel);
if (parquetTableMetadata.columnTypeInfo == null) {
parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
}
+ parquetTableMetadata.columnTypeInfo.put(new
ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), columnTypeMetadata);
+
// Save the column schema info. We'll merge it into one list
- parquetTableMetadata.columnTypeInfo
- .put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name),
columnTypeMetadata);
+ Object minValue = null;
+ Object maxValue = null;
+ long numNulls = -1;
+ boolean statsAvailable = stats != null && !stats.isEmpty();
if (statsAvailable) {
- // Write stats when they are not null
- Object minValue = null;
- Object maxValue = null;
if (stats.hasNonNullValue()) {
minValue = stats.genericGetMin();
maxValue = stats.genericGetMax();
- if (containsCorruptDates ==
ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION
- && columnTypeMetadata.originalType == OriginalType.DATE) {
+ if (containsCorruptDates ==
ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION &&
columnTypeMetadata.originalType == OriginalType.DATE) {
minValue =
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
maxValue =
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
}
-
}
- columnMetadata =
- new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(),
minValue, maxValue, stats.getNumNulls());
- } else {
- columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name,
col.getType(), null, null, null);
+ numNulls = stats.getNumNulls();
}
+ ColumnMetadata_v3 columnMetadata = new
ColumnMetadata_v3(columnTypeMetadata.name,
col.getPrimitiveType().getPrimitiveTypeName(), minValue, maxValue, numNulls);
columnMetadataList.add(columnMetadata);
length += col.getTotalSize();
}
@@ -632,12 +624,7 @@ public class Metadata {
List<? extends ParquetFileMetadata> files =
parquetTableMetadata.getFiles();
for (ParquetFileMetadata file : files) {
List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
- for (Iterator<? extends RowGroupMetadata> iter =
rowGroups.iterator(); iter.hasNext(); ) {
- RowGroupMetadata r = iter.next();
- if (r.getRowCount() == 0) {
- iter.remove();
- }
- }
+ rowGroups.removeIf(r -> r.getRowCount() == 0);
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
index d7d56c3..bed8be6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
@@ -45,10 +45,7 @@ public class MetadataBase {
* <p>
* Note: keep metadata versions synchronized with {@link
MetadataVersion.Constants}
*/
- @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
- include = JsonTypeInfo.As.PROPERTY,
- property = "metadata_version",
- visible = true)
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "metadata_version",
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = Metadata_V1.ParquetTableMetadata_v1.class,
name = V1),
@JsonSubTypes.Type(value = Metadata_V2.ParquetTableMetadata_v2.class,
name = V2),
@@ -108,6 +105,18 @@ public class MetadataBase {
public static abstract class ColumnMetadata {
+
+ /**
+ * Number of nulls is considered to be valid if its value is not null and
-1.
+ *
+ * @return true if nulls value is defined, false otherwise
+ */
+ @JsonIgnore
+ public boolean isNumNullsSet() {
+ Long nulls = getNulls();
+ return nulls != null && nulls != -1;
+ }
+
public abstract String[] getName();
public abstract Long getNulls();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
index f378a70..4bb07f7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
@@ -21,10 +21,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.KeyDeserializer;
import com.fasterxml.jackson.databind.SerializerProvider;
@@ -317,8 +313,7 @@ public class Metadata_V3 {
}
@Override
- public Object deserializeKey(String key,
com.fasterxml.jackson.databind.DeserializationContext ctxt)
- throws IOException,
com.fasterxml.jackson.core.JsonProcessingException {
+ public Object deserializeKey(String key,
com.fasterxml.jackson.databind.DeserializationContext ctxt) {
// key string should contain '`' char if the field was serialized as
SchemaPath object
if (key.contains("`")) {
return new Key(SchemaPath.parseFromString(key));
@@ -391,10 +386,10 @@ public class Metadata_V3 {
*/
@Override
public boolean hasSingleValue(long rowCount) {
- if (nulls != null) {
+ if (isNumNullsSet()) {
if (minValue != null) {
// Objects.deepEquals() is used here, since min and max may be byte
arrays
- return Objects.deepEquals(minValue, maxValue) && (nulls == 0 ||
nulls == rowCount);
+ return (nulls == 0 || nulls == rowCount) &&
Objects.deepEquals(minValue, maxValue);
} else {
return nulls == rowCount && maxValue == null;
}
@@ -418,19 +413,10 @@ public class Metadata_V3 {
return null;
}
- public static class DeSerializer extends
JsonDeserializer<ColumnMetadata_v3> {
- @Override public ColumnMetadata_v3 deserialize(JsonParser jp,
DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- return null;
- }
- }
-
-
// We use a custom serializer and write only non null values.
public static class Serializer extends JsonSerializer<ColumnMetadata_v3> {
@Override
- public void serialize(ColumnMetadata_v3 value, JsonGenerator jgen,
SerializerProvider provider)
- throws IOException, JsonProcessingException {
+ public void serialize(ColumnMetadata_v3 value, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
jgen.writeStartObject();
jgen.writeArrayFieldStart("name");
for (String n : value.name) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index 933d8ee..437074e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -95,7 +95,7 @@ public class ParquetMetaStatCollector implements
ColumnStatCollector {
if (columnMetadata != null) {
final Object min = columnMetadata.getMinValue();
final Object max = columnMetadata.getMaxValue();
- final Long numNull = columnMetadata.getNulls();
+ final long numNulls = columnMetadata.getNulls() == null ? -1 :
columnMetadata.getNulls();
primitiveType =
this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
originalType =
this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
@@ -109,7 +109,7 @@ public class ParquetMetaStatCollector implements
ColumnStatCollector {
precision = columnTypeInfo.precision;
}
- statMap.put(field, getStat(min, max, numNull, primitiveType,
originalType, scale, precision));
+ statMap.put(field, getStat(min, max, numNulls, primitiveType,
originalType, scale, precision));
} else {
final String columnName = field.getRootSegment().getPath();
if (implicitColValues.containsKey(columnName)) {
@@ -137,24 +137,21 @@ public class ParquetMetaStatCollector implements
ColumnStatCollector {
*
* @param min min value for statistics
* @param max max value for statistics
- * @param numNull num_nulls for statistics
+ * @param numNulls num_nulls for statistics
* @param primitiveType type that determines statistics class
* @param originalType type that determines statistics class
* @param scale scale value (used for DECIMAL type)
* @param precision precision value (used for DECIMAL type)
* @return column statistics
*/
- private ColumnStatistics getStat(Object min, Object max, Long numNull,
+ private ColumnStatistics getStat(Object min, Object max, long numNulls,
PrimitiveType.PrimitiveTypeName
primitiveType, OriginalType originalType,
int scale, int precision) {
Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
Statistics convertedStat = stat;
TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType,
originalType, scale, precision);
-
- if (numNull != null) {
- stat.setNumNulls(numNull);
- }
+ stat.setNumNulls(numNulls);
if (min != null && max != null ) {
switch (type.getMinorType()) {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
index cb7f22c..fd2c1b2 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
@@ -35,21 +35,16 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
}
@Test
- public void ensureCaseDoesntConvertToDirectScan() throws Exception {
+ public void ensureCaseDoesNotConvertToDirectScan() throws Exception {
testPlanMatchingPatterns(
"select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then
n_nationkey else null end) as cnt\n" +
- "from dfs.`directcount.parquet`",
- new String[] { "CASE" },
- new String[]{});
+ "from dfs.`directcount.parquet`", new String[]{"CASE"});
}
@Test
public void ensureConvertSimpleCountToDirectScan() throws Exception {
- final String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(
- sql,
- new String[] { "DynamicPojoRecordReader" },
- new String[]{});
+ String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
+ testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
testBuilder()
.sqlQuery(sql)
@@ -61,11 +56,8 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
@Test
public void ensureConvertSimpleCountConstToDirectScan() throws Exception {
- final String sql = "select count(100) as cnt from
cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(
- sql,
- new String[] { "DynamicPojoRecordReader" },
- new String[]{});
+ String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
+ testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
testBuilder()
.sqlQuery(sql)
@@ -77,11 +69,8 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
@Test
public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception
{
- final String sql = "select count(1 + 2) as cnt from
cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(
- sql,
- new String[] { "DynamicPojoRecordReader" },
- new String[]{});
+ String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
+ testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
testBuilder()
.sqlQuery(sql)
@@ -93,11 +82,8 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
@Test
public void ensureDoesNotConvertForDirectoryColumns() throws Exception {
- final String sql = "select count(dir0) as cnt from
cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(
- sql,
- new String[] { "ParquetGroupScan" },
- new String[]{});
+ String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
+ testPlanMatchingPatterns(sql, new String[]{"ParquetGroupScan"});
testBuilder()
.sqlQuery(sql)
@@ -109,11 +95,8 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
@Test
public void ensureConvertForImplicitColumns() throws Exception {
- final String sql = "select count(fqn) as cnt from
cp.`tpch/nation.parquet`";
- testPlanMatchingPatterns(
- sql,
- new String[] { "DynamicPojoRecordReader" },
- new String[]{});
+ String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
+ testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
testBuilder()
.sqlQuery(sql)
@@ -126,25 +109,22 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
@Test
public void ensureConvertForSeveralColumns() throws Exception {
test("use dfs.tmp");
- final String tableName = "parquet_table_counts";
+ String tableName = "parquet_table_counts";
try {
- final String newFqnColumnName = "new_fqn";
+ String newFqnColumnName = "new_fqn";
test("alter session set `%s` = '%s'",
ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName);
test("create table %s as select * from
cp.`parquet/alltypes_optional.parquet`", tableName);
test("refresh table metadata %s", tableName);
- final String sql = String.format("select\n" +
+ String sql = String.format("select\n" +
"count(%s) as implicit_count,\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count\n" +
"from %s", newFqnColumnName, tableName);
- testPlanMatchingPatterns(
- sql,
- new String[] { "DynamicPojoRecordReader" },
- new String[]{});
+ testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});
testBuilder()
.sqlQuery(sql)
@@ -159,4 +139,28 @@ public class TestConvertCountToDirectScan extends
PlanTestBase {
}
}
+ @Test
+ public void ensureCorrectCountWithMissingStatistics() throws Exception {
+ test("use dfs.tmp");
+ String tableName = "wide_str_table";
+ try {
+ // table will contain two partitions: one - with null value, second -
with non null value
+ test("create table %s partition by (col_str) as select * from
cp.`parquet/wide_string.parquet`", tableName);
+
+ String query = String.format("select count(col_str) as cnt_str, count(*)
as cnt_total from %s", tableName);
+
+ // direct scan should not be applied since we don't have statistics
+ testPlanMatchingPatterns(query, null, new
String[]{"DynamicPojoRecordReader"});
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt_str", "cnt_total")
+ .baselineValues(1L, 2L)
+ .go();
+ } finally {
+ test("drop table if exists %s", tableName);
+ }
+ }
+
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 3bbd397..c871ccc 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -454,6 +454,38 @@ public class TestParquetFilterPushDown extends
PlanTestBase {
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan);
}
+ @Test
+ public void testWithMissingStatistics() throws Exception {
+ /*
+ wide_string.parquet
+
+ Schema:
+ message root {
+ optional binary col_str (UTF8);
+ }
+
+ Content:
+ first row -> `a` character repeated 2050 times
+ second row -> null
+ */
+ String tableName = "wide_string_table";
+ java.nio.file.Path wideStringFilePath = Paths.get("parquet",
"wide_string.parquet");
+ dirTestWatcher.copyResourceToRoot(wideStringFilePath, Paths.get(tableName,
"0_0_0.parquet"));
+ dirTestWatcher.copyResourceToRoot(wideStringFilePath, Paths.get(tableName,
"0_0_1.parquet"));
+
+ String query = String.format("select count(1) as cnt from dfs.`%s` where
col_str is null", tableName);
+
+ String[] expectedPlan = {"numRowGroups=2"};
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPlan);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(2L)
+ .go();
+ }
+
//////////////////////////////////////////////////////////////////////////////////////////////////
// Some test helper functions.
//////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/exec/java-exec/src/test/resources/parquet/wide_string.parquet
b/exec/java-exec/src/test/resources/parquet/wide_string.parquet
new file mode 100644
index 0000000..a783bb8
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/wide_string.parquet differ