arina-ielchiieva closed pull request #976: DRILL-5797: Choose parquet reader
from read columns
URL: https://github.com/apache/drill/pull/976
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 60179482fc3..82764285e99 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -26,6 +26,8 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
@@ -37,12 +39,15 @@
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -119,7 +124,7 @@ public ScanBatch getBatch(FragmentContext context,
ParquetRowGroupScan rowGroupS
if (logger.isDebugEnabled()) {
logger.debug(containsCorruptDates.toString());
}
- if
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
&& !isComplex(footers.get(e.getPath()))) {
+ if
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
&& !isComplex(footers.get(e.getPath()), rowGroupScan.getColumns())) {
readers.add(
new ParquetRecordReader(
context, e.getPath(), e.getRowGroupIndex(),
e.getNumRecordsToRead(), fs,
@@ -156,20 +161,49 @@ public ScanBatch getBatch(FragmentContext context,
ParquetRowGroupScan rowGroupS
return new ScanBatch(rowGroupScan, context, oContext, readers,
implicitColumns);
}
- private static boolean isComplex(ParquetMetadata footer) {
+ private static boolean isComplex(ParquetMetadata footer, List<SchemaPath>
columns) {
+ /**
+ * ParquetRecordReader is not able to read any nested columns and is not
able to handle repeated columns.
+ * It only handles flat column and optional column.
+ * If it is a wildcard query, we check every columns in the metadata.
+ * If not, we only check the projected columns.
+ * We only check the first level columns because :
+ * - if we need a.b, it means a is a complex type, no need to check b as
we don't handle complex type.
+ * - if we need a[10], a is repeated, ie its repetiton level is greater
than 0
+ * - if we need a, it is at the first level of the schema.
+ */
MessageType schema = footer.getFileMetaData().getSchema();
-
- for (Type type : schema.getFields()) {
- if (!type.isPrimitive()) {
- return true;
+ if (Utilities.isStarQuery(columns)) {
+ for (Type type : schema.getFields()) {
+ if (!type.isPrimitive()) {
+ return true;
+ }
}
- }
- for (ColumnDescriptor col : schema.getColumns()) {
- if (col.getMaxRepetitionLevel() > 0) {
- return true;
+ for (ColumnDescriptor col : schema.getColumns()) {
+ if (col.getMaxRepetitionLevel() > 0) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ for (SchemaPath column : columns) {
+ if (isColumnComplex(schema, column)) {
+ return true;
+ }
}
+ return false;
+ }
+ }
+
+ private static boolean isColumnComplex(GroupType grouptype, SchemaPath
column) {
+ try {
+ Type type =
grouptype.getType(column.getRootSegment().getPath().toLowerCase());
+ return type.isRepetition(Type.Repetition.REPEATED) ||
!type.isPrimitive();
+ }
+ catch(InvalidRecordException e) {
+ //if the type does exist, we will fill with null so it is a simple type.
+ return false;
}
- return false;
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services