paul-rogers commented on code in PR #2515:
URL: https://github.com/apache/drill/pull/2515#discussion_r852601292
##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +164,104 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin,
HDF5FormatConfig formatConfig)
}
}
- public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
- this.readerConfig = readerConfig;
- this.maxRecords = maxRecords;
+ public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan,
FileSchemaNegotiator negotiator) {
+ errorContext = negotiator.parentErrorContext();
+ file = negotiator.file();
+ readerConfig = config;
dataWriters = new ArrayList<>();
- this.showMetadataPreview = readerConfig.formatConfig.showPreview();
- }
+ showMetadataPreview = readerConfig.formatConfig.showPreview();
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
- errorContext = negotiator.parentErrorContext();
// Since the HDF file reader uses a stream to actually read the file, the
file name from the
// module is incorrect.
- fileName = split.getPath().getName();
- try {
- openFile(negotiator);
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to close input file: %s", split.getPath())
- .addContext(errorContext)
- .build(logger);
+ fileName = file.split().getPath().getName();
+
+ { // Opens an HDF5 file
+ try (InputStream in =
file.fileSystem().openPossiblyCompressedStream(file.split().getPath())) {
+ /*
+ * As a possible future improvement, the jhdf reader has the ability
to read hdf5 files from
+ * a byte array or byte buffer. This implementation is better in that
it does not require creating
+ * a temporary file which must be deleted later. However, it could
result in memory issues in the
+ * event of large files.
+ */
+ hdfFile = HdfFile.fromInputStream(in);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: %s", file.split().getPath())
+ .addContext(errorContext)
+ .build(logger);
+ }
}
- ResultSetLoader loader;
- if (readerConfig.defaultPath == null) {
- // Get file metadata
- List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new
ArrayList<>());
- metadataIterator = metadata.iterator();
-
- // Schema for Metadata query
- SchemaBuilder builder = new SchemaBuilder()
- .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
- .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
- .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
- .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
- .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
- negotiator.tableSchema(builder.buildSchema(), false);
-
- loader = negotiator.build();
- dimensions = new int[0];
- rowWriter = loader.writer();
-
- } else {
- // This is the case when the default path is specified. Since the user
is explicitly asking for a dataset
- // Drill can obtain the schema by getting the datatypes below and
ultimately mapping that schema to columns
- Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
- dimensions = dataSet.getDimensions();
-
- loader = negotiator.build();
- rowWriter = loader.writer();
- writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
- negotiator.parentErrorContext());
- if (dimensions.length <= 1) {
- buildSchemaFor1DimensionalDataset(dataSet);
- } else if (dimensions.length == 2) {
- buildSchemaFor2DimensionalDataset(dataSet);
+ { // Build the schema and initial the writer
+ ResultSetLoader loader;
+ if (readerConfig.defaultPath == null) {
+ // Get file metadata
+ List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new
ArrayList<>());
+ metadataIterator = metadata.iterator();
+
+ // Schema for Metadata query
+ SchemaBuilder builder = new SchemaBuilder()
+ .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+ .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+ .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+ .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+ .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
+
+ negotiator.tableSchema(builder.buildSchema(), false);
Review Comment:
In this path we are telling EVF2 the schema to use. The `false` argument
says we are free to add columns later.
##########
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##########
@@ -171,107 +164,104 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin,
HDF5FormatConfig formatConfig)
}
}
- public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
- this.readerConfig = readerConfig;
- this.maxRecords = maxRecords;
+ public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan,
FileSchemaNegotiator negotiator) {
+ errorContext = negotiator.parentErrorContext();
+ file = negotiator.file();
+ readerConfig = config;
dataWriters = new ArrayList<>();
- this.showMetadataPreview = readerConfig.formatConfig.showPreview();
- }
+ showMetadataPreview = readerConfig.formatConfig.showPreview();
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
- errorContext = negotiator.parentErrorContext();
// Since the HDF file reader uses a stream to actually read the file, the
file name from the
// module is incorrect.
- fileName = split.getPath().getName();
- try {
- openFile(negotiator);
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to close input file: %s", split.getPath())
- .addContext(errorContext)
- .build(logger);
+ fileName = file.split().getPath().getName();
+
+ { // Opens an HDF5 file
+ try (InputStream in =
file.fileSystem().openPossiblyCompressedStream(file.split().getPath())) {
+ /*
+ * As a possible future improvement, the jhdf reader has the ability
to read hdf5 files from
+ * a byte array or byte buffer. This implementation is better in that
it does not require creating
+ * a temporary file which must be deleted later. However, it could
result in memory issues in the
+ * event of large files.
+ */
+ hdfFile = HdfFile.fromInputStream(in);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: %s", file.split().getPath())
+ .addContext(errorContext)
+ .build(logger);
+ }
}
- ResultSetLoader loader;
- if (readerConfig.defaultPath == null) {
- // Get file metadata
- List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new
ArrayList<>());
- metadataIterator = metadata.iterator();
-
- // Schema for Metadata query
- SchemaBuilder builder = new SchemaBuilder()
- .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
- .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
- .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
- .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
- .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
- .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
- negotiator.tableSchema(builder.buildSchema(), false);
-
- loader = negotiator.build();
- dimensions = new int[0];
- rowWriter = loader.writer();
-
- } else {
- // This is the case when the default path is specified. Since the user
is explicitly asking for a dataset
- // Drill can obtain the schema by getting the datatypes below and
ultimately mapping that schema to columns
- Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
- dimensions = dataSet.getDimensions();
-
- loader = negotiator.build();
- rowWriter = loader.writer();
- writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
- negotiator.parentErrorContext());
- if (dimensions.length <= 1) {
- buildSchemaFor1DimensionalDataset(dataSet);
- } else if (dimensions.length == 2) {
- buildSchemaFor2DimensionalDataset(dataSet);
+ { // Build the schema and initial the writer
+ ResultSetLoader loader;
+ if (readerConfig.defaultPath == null) {
+ // Get file metadata
+ List<HDF5DrillMetadata> metadata = getFileMetadata(hdfFile, new
ArrayList<>());
+ metadataIterator = metadata.iterator();
+
+ // Schema for Metadata query
+ SchemaBuilder builder = new SchemaBuilder()
+ .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+ .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+ .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+ .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+ .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
+
+ negotiator.tableSchema(builder.buildSchema(), false);
+
+ loader = negotiator.build();
+ rowWriter = loader.writer();
+
+ dimensions = null;
+ writerSpec = null;
+ pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+ dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+ fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+ dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+ linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
+ elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
+ datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
+ dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
} else {
- // Case for datasets of greater than 2D
- // These are automatically flattened
- buildSchemaFor2DimensionalDataset(dataSet);
+ // This is the case when the default path is specified. Since the user
is explicitly asking for a dataset
+ // Drill can obtain the schema by getting the datatypes below and
ultimately mapping that schema to columns
+ Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
+ dimensions = dataSet.getDimensions();
+
+ loader = negotiator.build();
Review Comment:
In this path, we did not tell EVF2 about the schema. This means EVF2 expects
us to discover columns as we go along. Is this the intent? The comment can be
read as saying either "Drill can obtain the schema NOW" or "Drill can obtain
the schema LATER".
But, it is odd, within the same reader, to provide a schema down one path,
and not in the other path. It almost seems that there are two distinct readers
in this case.
##########
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java:
##########
@@ -189,7 +189,7 @@ private void insertColumn(ColumnMetadata col) {
switch (mode) {
case FIRST_READER_SCHEMA:
case READER_SCHEMA:
- if (schema.projectionType() != ProjectionType.ALL) {
+ if (schema.projectionType() != ProjectionType.ALL && !col.isArray()) {
Review Comment:
This error check is correct: the problem must be in the `ResultSetLoader`.
This error says that the `ResultSetLoader` is trying to add a materialized
column, but that column is not projected. The result set loader should have
created a dummy column and not passed the column along to this mechanism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]