paul-rogers commented on code in PR #2937:
URL: https://github.com/apache/drill/pull/2937#discussion_r1739400611


##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java:
##########
@@ -73,7 +74,10 @@ static ColumnReader<?> 
createFixedColumnReader(ParquetRecordReader recordReader,
     ConvertedType convertedType = schemaElement.getConverted_type();
     // if the column is required, or repeated (in which case we just want to 
use this to generate our appropriate
     // ColumnReader for actually transferring data into the data vector inside 
of our repeated vector
-    if (descriptor.getMaxDefinitionLevel() == 0 || 
descriptor.getMaxRepetitionLevel() > 0) {
+    // Choose a reader based on a ValueVector DataMode since we might want to 
put
+    // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector
+    // see ParquetSchema#tableSchema for details
+    if (v.getField().getDataMode() != TypeProtos.DataMode.OPTIONAL) {

Review Comment:
   IF we are enforcing a planner-defined type, then use the entire type (major 
and minor types) from the planner-provided schema. It won't work to have the 
reader second-guessing the common schema that the planner selected.
   
   Again, EVF handles all this. You can try setting up a unit case using EVF to 
establish the baseline for what Parquet should repeat in its logic.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java:
##########
@@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) {
     this.column = column;
   }
 
-  public void resolveDrillType(Map<String, SchemaElement> schemaElements, 
OptionManager options) {
+  public void resolveDrillType(Map<String, SchemaElement> schemaElements, 
OptionManager options, boolean isEnforcedOptional) {

Review Comment:
   Again, if we are enforcing a planner-provided schema, the job is to map 
whatever the Parquet type is into the given, fixed Drill type. There is only 
one right answer when the schema is provided. Again, see EVF for how this works.



##########
exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java:
##########
@@ -0,0 +1,128 @@
+package org.apache.drill;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+
+/**
+ * Covers querying a table in which some parquet files do contain selected 
columns, and
+ * others do not (or have them as OPTIONALs).
+ *
+ * Expected behavior for the missing columns is following:
+ * 1) If at least 1 parquet file to be read has the column, take the minor 
type from there.
+ * Otherwise, default to INT.
+ * 2) If at least 1 parquet file to be read doesn't have the column, or has it 
as OPTIONAL,
+ * enforce the overall scan output schema to have it as OPTIONAL
+ *
+ * We need to control ordering of scanning batches to cover different 
erroneous cases, and we assume
+ * that parquet files in a table would be read in alphabetic order (not a real 
use case though). So
+ * we name our files 0.parquet and 1.parquet expecting that they would be 
scanned in that order
+ * (not guaranteed though, but seems to work). We use such tables for such 
scenarios:
+ *
+ * - parquet/partially_missing/o_m -- optional, then missing
+ * - parquet/partially_missing/m_o -- missing, then optional
+ * - parquet/partially_missing/r_m -- required, then missing
+ * - parquet/partially_missing/r_o -- required, then optional

Review Comment:
   Thanks for explaining the schemas here in the test comments!



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java:
##########
@@ -64,6 +65,19 @@ public final class ParquetSchema {
   private final int rowGroupIndex;
   private final ParquetMetadata footer;
 
+  /**
+   * Schema for the whole table constructed by a GroupScan from all the 
parquet files to read.
+   * If we don't find a selected column in our parquet file, type for the 
null-filled vector
+   * to create would be tried to find in this schema. That is, if some other 
parquet file contains
+   * the column, we'll take their type. Otherwise, default to Nullable Int.
+   * Also, if at least 1 file does not contain the selected column, then the 
overall table schema
+   * should have this field with OPTIONAL data mode. GroupScan catches this 
case and sets the
+   * appropriate data mode in this schema. Our mission here is to enforce that 
OPTIONAL mode in our
+   * output schema, even if the particular parquet file we're reading from has 
this field REQUIRED,
+   * to provide consistency across all scan batches.
+   */

Review Comment:
   Great start! See the other cases described above.
   
   Also, I seem to remember creating code to handle evolving column types as 
part of EVF. Perhaps you can find that code. The code likely has a large number 
of unit tests (I'm a test-driven kinda guy) which you can reuse to test your 
parallel implementation.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java:
##########
@@ -751,6 +752,17 @@ public static MinorType 
getLeastRestrictiveType(MinorType... types) {
     return result;
   }
 
+  public static MajorType getLeastRestrictiveMajorType(MajorType... 
majorTypes) {

Review Comment:
   Here we are passing in major types and finding the least restrictive among 
them. In general, the least restrictive can be a type NOT in the list. For 
example, given a `FLOAT` and an `INT`, the least restrictive is `DOUBLE`, which 
is able to hold either a 32-bit `FLOAT` or a 32-bit `INT`.
   
   Similarly, given a `VARCHAR` and a `MAP`, the least restrictive is `UNION`. 
(Though a word of warning: `UNION` has never worked right, is slow, and is a 
resource hog. No SQL client understands it either.)
   
   I suspect this logic must exist somewhere. Of course, finding it may be 
difficult. If we want to create a new version, then this kind of thing needs 
extensive unit tests with many different column combinations.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java:
##########
@@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> 
resolveFields(MetadataBase.ParquetT
       // row groups in the file have the same schema, so using the first one
       Map<SchemaPath, TypeProtos.MajorType> fileColumns = 
getFileFields(parquetTableMetadata, file);
       fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, 
type));
+      // If at least 1 parquet file to read doesn't contain a column, enforce 
this column
+      // DataMode to OPTIONAL in the overall table schema

Review Comment:
   The general rule has to be:
   
   * For all columns that exist, define a common type that can hold all of the 
associated column types.
   * If any column is optional (or missing), assign the OPTIONAL type -- but 
only if the other types are REQUIRED.
   * If all columns are REPEATED, then the missing column is also REPEATED. (In 
Drill, a zero-length array is the same as NULL: there is no such thing as a 
NULL array in Drill.)
   * If any column is REPEATED, and some column is OPTIONAL or REQUIRED, then 
choose REPEATED as the column type. Ensure that the runtime code handles the 
case of writing a single value into the array when we read the file with the 
OPTIONAL or REQUIRED column.
   
   IIRC, EVF handles all the above for dynamic columns. If Drill had type logic 
in the Calcite planner, it should handle these same rules.
   
   Again, this kind of logic requires extensive unit tests of all the cases 
above, plus any others you can think up.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java:
##########
@@ -72,6 +73,11 @@ public class ParquetRecordReader extends 
CommonParquetRecordReader {
 
   private final boolean useBulkReader;
 
+  /**
+   * See {@link ParquetSchema#tableSchema}
+   */
+  private final TupleMetadata tableSchema;

Review Comment:
   This is optional? Or, as a result of this will we ALWAYS do a Parquet 
prescan to set the schema at plan time? Can a user interested in performance 
turn this off?
   
   Also, is this wired into the Drill Metastore? Into the provided schema 
framework?
   
   We had the old Parquet schema cache. How did that pass schema into the 
Parquet readers? Or, maybe it didn't actually provide a schema to the readers?



##########
exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.drill;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Covers selecting completely missing columns from a parquet table. Should 
create Nullable Int
+ * ValueVector in that case since there is no chance to guess the correct data 
type here.
+ */

Review Comment:
   Note that `NULLABLE INT` is just a guess; there is nothing special about it. 
EVF allows the reader to define its own "default" column type. For example, for 
CSV, columns are NEVER `INT`, so it is better to guess `VARCHAR`, which is the 
only type CSV supports. Parquet can play the same trick if doing so is useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to