paul-rogers commented on code in PR #2937: URL: https://github.com/apache/drill/pull/2937#discussion_r1739400611
########## exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java: ########## @@ -73,7 +74,10 @@ static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, ConvertedType convertedType = schemaElement.getConverted_type(); // if the column is required, or repeated (in which case we just want to use this to generate our appropriate // ColumnReader for actually transferring data into the data vector inside of our repeated vector - if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) { + // Choose a reader based on a ValueVector DataMode since we might want to put + // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector + // see ParquetSchema#tableSchema for details + if (v.getField().getDataMode() != TypeProtos.DataMode.OPTIONAL) { Review Comment: IF we are enforcing a planner-defined type, then use the entire type (major and minor types) from the planner-provided schema. It won't work to have the reader second-guessing the common schema that the planner selected. Again, EVF handles all this. You can try setting up a unit case using EVF to establish the baseline for what Parquet should repeat in its logic. ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java: ########## @@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) { this.column = column; } - public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) { + public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options, boolean isEnforcedOptional) { Review Comment: Again, if we are enforcing a planner-provided schema, the job is to map whatever the Parquet type is into the given, fixed Drill type. There is only one right answer when the schema is provided. Again, see EVF for how this works. ########## exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java: ########## @@ -0,0 +1,128 @@ +package org.apache.drill; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchemaBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +/** + * Covers querying a table in which some parquet files do contain selected columns, and + * others do not (or have them as OPTIONALs). + * + * Expected behavior for the missing columns is following: + * 1) If at least 1 parquet file to be read has the column, take the minor type from there. + * Otherwise, default to INT. + * 2) If at least 1 parquet file to be read doesn't have the column, or has it as OPTIONAL, + * enforce the overall scan output schema to have it as OPTIONAL + * + * We need to control ordering of scanning batches to cover different erroneous cases, and we assume + * that parquet files in a table would be read in alphabetic order (not a real use case though). So + * we name our files 0.parquet and 1.parquet expecting that they would be scanned in that order + * (not guaranteed though, but seems to work). We use such tables for such scenarios: + * + * - parquet/partially_missing/o_m -- optional, then missing + * - parquet/partially_missing/m_o -- missing, then optional + * - parquet/partially_missing/r_m -- required, then missing + * - parquet/partially_missing/r_o -- required, then optional Review Comment: Thanks for explaining the schemas here in the test comments! ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java: ########## @@ -64,6 +65,19 @@ public final class ParquetSchema { private final int rowGroupIndex; private final ParquetMetadata footer; + /** + * Schema for the whole table constructed by a GroupScan from all the parquet files to read. + * If we don't find a selected column in our parquet file, type for the null-filled vector + * to create would be tried to find in this schema. That is, if some other parquet file contains + * the column, we'll take their type. Otherwise, default to Nullable Int. + * Also, if at least 1 file does not contain the selected column, then the overall table schema + * should have this field with OPTIONAL data mode. GroupScan catches this case and sets the + * appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our + * output schema, even if the particular parquet file we're reading from has this field REQUIRED, + * to provide consistency across all scan batches. + */ Review Comment: Great start! See the other cases described above. Also, I seem to remember creating code to handle evolving column types as part of EVF. Perhaps you can find that code. The code likely has a large number of unit tests (I'm a test-driven kinda guy) which you can reuse to test your parallel implementation. ########## exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java: ########## @@ -751,6 +752,17 @@ public static MinorType getLeastRestrictiveType(MinorType... types) { return result; } + public static MajorType getLeastRestrictiveMajorType(MajorType... majorTypes) { Review Comment: Here we are passing in major types and finding the least restrictive among them. In general, the least restrictive can be a type NOT in the list. For example, given a `FLOAT` and an `INT`, the least restrictive is `DOUBLE`, which is able to hold either a 32-bit `FLOAT` or a 32-bit `INT`. Similarly, given a `VARCHAR` and a `MAP`, the least restrictive is `UNION`. (Though a word of warning: `UNION` has never worked right, is slow, and is a resource hog. No SQL client understands it either.) I suspect this logic must exist somewhere. Of course, finding it may be difficult. If we want to create a new version, then this kind of thing needs extensive unit tests with many different column combinations. ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java: ########## @@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> resolveFields(MetadataBase.ParquetT // row groups in the file have the same schema, so using the first one Map<SchemaPath, TypeProtos.MajorType> fileColumns = getFileFields(parquetTableMetadata, file); fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type)); + // If at least 1 parquet file to read doesn't contain a column, enforce this column + // DataMode to OPTIONAL in the overall table schema Review Comment: The general rule has to be: * For all columns that exist, define a common type that can hold all of the associated column types. * If any column is optional (or missing), assign the OPTIONAL type -- but only if the other types are REQUIRED. * If all columns are REPEATED, then the missing column is also REPEATED. (In Drill, a zero-length array is the same as NULL: there is no such thing as a NULL array in Drill.) * If any column is REPEATED, and some column is OPTIONAL or REQUIRED, then choose REPEATED as the column type. Ensure that the runtime code handles the case of writing a single value into the array when we read the file with the OPTIONAL or REQUIRED column. IIRC, EVF handles all the above for dynamic columns. If Drill had type logic in the Calcite planner, it should handle these same rules. Again, this kind of logic requires extensive unit tests of all the cases above, plus any others you can think up. ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java: ########## @@ -72,6 +73,11 @@ public class ParquetRecordReader extends CommonParquetRecordReader { private final boolean useBulkReader; + /** + * See {@link ParquetSchema#tableSchema} + */ + private final TupleMetadata tableSchema; Review Comment: This is optional? Or, as a result of this will we ALWAYS do a Parquet prescan to set the schema at plan time? Can a user interested in performance turn this off? Also, is this wired into the Drill Metastore? Into the provided schema framework? We had the old Parquet schema cache. How did that pass schema into the Parquet readers? Or, maybe it didn't actually provide a schema to the readers? ########## exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java: ########## @@ -0,0 +1,91 @@ +package org.apache.drill; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchemaBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Covers selecting completely missing columns from a parquet table. Should create Nullable Int + * ValueVector in that case since there is no chance to guess the correct data type here. + */ Review Comment: Note that `NULLABLE INT` is just a guess; there is nothing special about it. EVF allows the reader to define its own "default" column type. For example, for CSV, columns are NEVER `INT`, so it is better to guess `VARCHAR`, which is the only type CSV supports. Parquet can play the same trick if doing so is useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org