paul-rogers commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5 URL: https://github.com/apache/drill/pull/1778#discussion_r331769456
########## File path: contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java ########## @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.hdf5; + +import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation; +import ch.systemsx.cisd.hdf5.HDF5DataClass; +import ch.systemsx.cisd.hdf5.HDF5DataSetInformation; +import ch.systemsx.cisd.hdf5.HDF5FactoryProvider; +import ch.systemsx.cisd.hdf5.HDF5LinkInformation; +import ch.systemsx.cisd.hdf5.IHDF5Factory; +import ch.systemsx.cisd.hdf5.IHDF5Reader; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.mapred.FileSplit; +import org.joda.time.Instant; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDF5BatchReader.class); + private FileSplit split; + private HDF5FormatConfig formatConfig; + private ResultSetLoader loader; + private String tempFileName; + private IHDF5Reader HDF5reader; + private File infile; + private BufferedReader reader; + protected HDF5ReaderConfig readerConfig; + private boolean finish; + + + public static class HDF5ReaderConfig { + protected final HDF5FormatPlugin plugin; + protected TupleMetadata schema; + protected String defaultPath; + protected HDF5FormatConfig formatConfig; + + public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) { + this.plugin = plugin; + this.formatConfig = formatConfig; + this.defaultPath = formatConfig.getDefaultPath(); + } + } + + + public HDF5BatchReader(HDF5ReaderConfig readerConfig) { + this.readerConfig = readerConfig; + this.formatConfig = readerConfig.formatConfig; + } + + @Override + public boolean open(FileSchemaNegotiator negotiator) { + split = negotiator.split(); + loader = negotiator.build(); + openFile(negotiator); + this.loader = negotiator.build(); + return true; + } + + private void openFile(FileSchemaNegotiator negotiator) { + InputStream in; + try { + in = negotiator.fileSystem().open(split.getPath()); + IHDF5Factory factory = HDF5FactoryProvider.get(); + this.infile = convertInputStreamToFile(in); + this.HDF5reader = factory.openForReading(infile); + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failed to open open input file: %s", split.getPath()) + .build(logger); + } + reader = new BufferedReader(new InputStreamReader(in)); + } + + /** + * This function converts the Drill inputstream into a File object for the HDF5 library. This function + * exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream. A future + * release of the library will support this. + * + * @param stream + * @return + * @throws IOException + */ + private File convertInputStreamToFile(InputStream stream) throws IOException { + this.tempFileName = "./~" + split.getPath().getName(); + File targetFile = new File(tempFileName); + java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + IOUtils.closeQuietly(stream); + return targetFile; + } + + @Override + public boolean next() { + RowSetLoader rowWriter = loader.writer(); + while (!rowWriter.isFull()) { + if (this.formatConfig.getDefaultPath() == null || this.formatConfig.getDefaultPath().isEmpty()) { + return projectFileMetadata(rowWriter); + } else { + return projectDataset(rowWriter, readerConfig.defaultPath, false); + } + + } + return false; + } + + private boolean projectFileMetadata(RowSetLoader rowWriter) { + List<HDF5DrillMetadata> metadata = getFileMetadata(this.HDF5reader.object().getGroupMemberInformation("/", true), new ArrayList<>()); + + for (HDF5DrillMetadata record : metadata) { + rowWriter.start(); + writeStringColumn(rowWriter, "path", record.getPath()); + writeStringColumn(rowWriter, "data_type", record.getDataType()); + writeStringColumn(rowWriter, "file_name",this.infile.getName().replace("~", "") ); + + //Write attributes if present + if (record.getAttributes().size() > 0) { + writeAttributes(rowWriter, record); + } + if (record.getDataType().equals("DATASET")) { + if (readerConfig.defaultPath != null) { + projectDataset(rowWriter, readerConfig.defaultPath, true); + } else { + projectDataset(rowWriter, record.getPath(), true); + } + } + rowWriter.save(); + } + return false; + } + + /** + * This helper function returns the name of a HDF5 record from a data path + * @param path Path to HDF5 data + * @return String name of data + */ + private String getNameFromPath(String path) { + String pattern = "/*.*/(.+?)$"; + Pattern r = Pattern.compile(pattern); + // Now create matcher object. + Matcher m = r.matcher(path); + if (m.find()) { + return m.group(1); + } else { + return ""; + } + } + + private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> members, List<HDF5DrillMetadata> metadata) { + for (HDF5LinkInformation info : members) { + HDF5DrillMetadata metadataRow = new HDF5DrillMetadata(); + + metadataRow.setPath(info.getPath()); + metadataRow.setDataType(info.getType().toString()); + + switch (info.getType()) { + case DATASET: + metadataRow.setAttributes(getAttributes(HDF5reader, info.getPath())); + HDF5DataSetInformation dsInfo = HDF5reader.object().getDataSetInformation(info.getPath()); + metadata.add(metadataRow); + break; + case SOFT_LINK: + // Soft links cannot have attributes + metadata.add(metadataRow); + break; + case GROUP: + metadataRow.setAttributes(getAttributes(HDF5reader, info.getPath())); + metadata.add(metadataRow); + metadata = getFileMetadata(HDF5reader.object().getGroupMemberInformation(info.getPath(), true), metadata); + break; + default: + break; + } + } + return metadata; + } + + private HashMap getAttributes(IHDF5Reader reader, String path) { + HashMap<String, HDF5Attribute> attributes = new HashMap<>(); + long attrCount = reader.object().getObjectInformation(path).getNumberOfAttributes(); + if (attrCount > 0) { + List<String> attrNames = reader.object().getAllAttributeNames(path); + for (String name : attrNames) { + try { + HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, reader); + attributes.put(attribute.getKey(), attribute); + } catch (Exception e) { + logger.info("Couldn't add attribute: " + path + " " + name); + } + } + } + return attributes; + } + + + private boolean projectDataset(RowSetLoader rowWriter, String datapath, boolean isMetadataQuery) { + int resultCount = 0; + + String fieldName = getNameFromPath(datapath); + IHDF5Reader reader = this.HDF5reader; + HDF5DataSetInformation dsInfo = reader.object().getDataSetInformation(datapath); + HDF5DataClass dataType = dsInfo.getTypeInformation().getRawDataClass(); + long[] dimensions = dsInfo.getDimensions(); + //Case for single dimensional data + if (dimensions.length <= 1) { + TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo); + // Case for null or unknown data types: + if (currentDataType == null) { + System.out.println(dsInfo.getTypeInformation().tryGetJavaType()); + } + + switch (currentDataType) { + case GENERIC_OBJECT: + ArrayList enumData = new ArrayList(Arrays.asList(HDF5reader.readEnumArray(datapath).toStringArray())); + break; + case VARCHAR: + try { + if (readerConfig.defaultPath != null) { + String[] tempStringList = HDF5reader.readStringArray(datapath); + for (String value : tempStringList) { + rowWriter.start(); + writeStringColumn(rowWriter, fieldName, value); + rowWriter.save(); + } + } else { + String[] data = HDF5reader.readStringArray(datapath); + writeStringListColumn(rowWriter, fieldName, data); + } + } catch (Exception e) { + logger.warn("Unknown HDF5 data type: " + datapath); + } + break; + case TIMESTAMP: + long ts = HDF5reader.readTimeStamp(datapath); + writeTimestampColumn(rowWriter, fieldName, ts); + break; + case INT: + if (!dsInfo.getTypeInformation().isSigned()) { + if (dsInfo.getTypeInformation().getElementSize() > 4) { + if (readerConfig.defaultPath != null) { + long[] tempLongList = HDF5reader.readLongArray(datapath); + for (long i : tempLongList) { + rowWriter.start(); + writeLongColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + long[] longList = HDF5reader.uint64().readArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeLongListColumn(rowWriter, fieldName, longList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + } + } else { + if (readerConfig.defaultPath != null) { + int[] tempIntList = HDF5reader.readIntArray(datapath); + for (int i : tempIntList) { + rowWriter.start(); + writeIntColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + + } else { + int[] intList = HDF5reader.readIntArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeIntListColumn(rowWriter, fieldName, intList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + } + break; + case FLOAT4: + if (readerConfig.defaultPath != null) { + float[] tempFloatList = HDF5reader.readFloatArray(datapath); + for (float i : tempFloatList) { + rowWriter.start(); + writeFloat4Column(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + float[] tempFloatList = HDF5reader.readFloatArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeFloat4ListColumn(rowWriter, fieldName, tempFloatList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case FLOAT8: // TODO Add auto-flatten here... + if (readerConfig.defaultPath != null) { + double[] tempDoubleList = HDF5reader.readDoubleArray(datapath); + for (double i : tempDoubleList) { + rowWriter.start(); + writeFloat8Column(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + double[] tempFloatList = HDF5reader.readDoubleArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeFloat8ListColumn(rowWriter, fieldName, tempFloatList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case BIGINT: + if (!dsInfo.getTypeInformation().isSigned()) { + logger.warn("Drill does not support unsigned 64bit integers."); + break; + } + if (readerConfig.defaultPath != null) { + long[] tempLongList = HDF5reader.readLongArray(datapath); + for (long i : tempLongList) { + rowWriter.start(); + writeLongColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + + } else { + long[] tempLongList = HDF5reader.readLongArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeLongListColumn(rowWriter, fieldName, tempLongList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case MAP: + //try { // TODO Auto flatten compound data type + getAndMapCompoundData(datapath, new ArrayList<>(), this.HDF5reader, rowWriter); + //} catch (Exception e) { + // throw UserException + // .dataWriteError() + // .addContext("Error writing Compound Field: ") + // .addContext(e.getMessage()) + // .build(logger); + //} + break; + default: + //Case for data types that cannot be read + logger.warn(dsInfo.getTypeInformation().tryGetJavaType() + " not implemented....yet."); + break; + } + } else if (dimensions.length == 2) { + long cols = dimensions[1]; + long rows = dimensions[0]; + switch (HDF5Utils.getDataType(dsInfo)) { + case INT: + int[][] colData = HDF5reader.readIntMatrix(datapath); + mapIntMatrixField(fieldName, colData, (int) cols, (int) rows, rowWriter); + break; + /*case FLOAT4: + float[][] floatData = HDF5reader.readFloatMatrix(datapath); + resultCount = mapFloatMatrixField(fieldName, floatData, (int) cols, (int) rows, map); + break; + case FLOAT8: + double[][] doubleData = HDF5reader.readDoubleMatrix(datapath); + resultCount = mapDoubleMatrixField(fieldName, doubleData, (int) cols, (int) rows, map); + break; + case BIGINT: + long[][] longData = HDF5reader.readLongMatrix(datapath); + resultCount = mapLongMatrixField(fieldName, longData, (int) cols, (int) rows, map); + break;*/ + default: + logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented."); + break; + } + }/* else { + // Case for data sets with dimensions > 2 + long cols = dimensions[1]; + long rows = dimensions[0]; + switch (HDF5Utils.getDataType(dsInfo)) { + case INT: + int[][] colData = HDF5reader.int32().readMDArray(datapath).toMatrix(); + resultCount = mapIntMatrixField(fieldName, colData, (int) cols, (int) rows, map); + break; + case FLOAT4: + float[][] floatData = HDF5reader.float32().readMDArray(datapath).toMatrix(); + resultCount = mapFloatMatrixField(fieldName, floatData, (int) cols, (int) rows, map); + break; + case FLOAT8: + double[][] doubleData = HDF5reader.float64().readMDArray(datapath).toMatrix(); + resultCount = mapDoubleMatrixField(fieldName, doubleData, (int) cols, (int) rows, map); + break; + case BIGINT: + long[][] longData = HDF5reader.int64().readMDArray(datapath).toMatrix(); + resultCount = mapLongMatrixField(fieldName, longData, (int) cols, (int) rows, map); + break; + default: + logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented."); + break; + }*/ + //} + //} + + return false; + } + + private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) { + boolean bool_value = true; + if(value == 0) { + bool_value = false; + } + writeBooleanColumn(rowWriter, name, bool_value); + } + + private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.BIT, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setBoolean(value); + } + + private void writeIntColumn(TupleWriter rowWriter, String name, int value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setInt(value); + } + + private void writeIntListColumn(TupleWriter rowWriter, String name, Integer[] list) { + int[] data = new int[list.length]; + for(int i = 0; i < list.length; i++) { + data[i] = list[i].intValue(); + } + writeIntListColumn(rowWriter, name, data); + } + + private void writeIntListColumn(TupleWriter rowWriter, String name, int[] list){ + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED); + index = rowWriter.addColumn(colSchema); + } + + ScalarWriter arrayWriter = rowWriter.column(index).array().scalar(); + for (int i = 0; i < list.length; i++) { + int j = i+1; + arrayWriter.setInt(list[i]); + } + } + + private void mapIntMatrixField(String fieldName, int[][] colData, int cols, int rows, RowSetLoader rowWriter) { + String colName = "int_data"; + + // If the default path is not null, auto flatten the data + // The end result are that a 2D array gets mapped to Drill columns + if (readerConfig.defaultPath != null) { + for (int i = 0; i < rows; i++) { + rowWriter.start(); + for (int k = 0; k < cols; k++) { + String tempColumnName = "int_col_" + k; + writeIntColumn(rowWriter, tempColumnName, colData[i][k]); + } + rowWriter.save(); + } + } else { + mapHelper(fieldName, colData, cols, rows, rowWriter); + // This is the case where a dataset is projected in a metadata query. The result should be a list of lists + /* + int index = rowWriter.tupleSchema().index(colName); + if (index == -1) { + ColumnMetadata innerColumnSchema = MetadataUtils.newScalar("inner_int_list", TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED); + ColumnMetadata colSchema = MetadataUtils.newRepeatedList(colName, innerColumnSchema ); + try { + index = rowWriter.addColumn(innerColumnSchema); + index = rowWriter.addColumn(colSchema); + } catch (Exception e){ + e.getMessage(); + } + } + + ScalarWriter arrayWriter = rowWriter.column(index).scalar(); + for (int i = 0; i < rows; i++) { + for (int k = 0; k < cols; k++) { + arrayWriter.setInt(colData[i][k]); + } + } + */ + } + } + + private void mapHelper(String fieldName, int[][] colData, int cols, int rows, TupleWriter rowWriter){ + // This is the case where a dataset is projected in a metadata query. The result should be a list of lists + String colName = "int_data"; + int index = rowWriter.tupleSchema().index(colName); + if (index == -1) { + ColumnMetadata innerColumnSchema = MetadataUtils.newScalar("inner_int_list", TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED); + ColumnMetadata colSchema = MetadataUtils.newRepeatedList(colName, innerColumnSchema ); + //index = rowWriter.addColumn(innerColumnSchema); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter arrayWriter = rowWriter.column(index).array().array().scalar(); + for (int i = 0; i < rows; i++) { + for (int k = 0; k < cols; k++) { + arrayWriter.setInt(colData[i][k]); + } + } + } + + private void writeLongColumn(TupleWriter rowWriter, String name, long value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setLong(value); + } + + private void writeLongListColumn(TupleWriter rowWriter, String name, long[] list){ + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED); + index = rowWriter.addColumn(colSchema); + } + + ScalarWriter arrayWriter = rowWriter.column(index).array().scalar(); + for (int i = 0; i < list.length; i++) { + int j = i+1; + arrayWriter.setLong(list[i]); + } + } + + //Helper function to write Long arrays + private void mapLongListField(TupleWriter rowWriter, String fieldName, ArrayList<Long> data, BaseWriter.MapWriter map) { + for (Long fieldValue : data) { + BigIntHolder rowHolder = new BigIntHolder(); + rowHolder.value = fieldValue.longValue(); + map.list(fieldName).bigInt().write(rowHolder); + } + } + + + private void writeStringColumn(TupleWriter rowWriter, String name, String value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setString(value); + } + + private void writeStringListColumn(TupleWriter rowWriter, String name, String[] list){ + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED); + index = rowWriter.addColumn(colSchema); + } + + ScalarWriter arrayWriter = rowWriter.column(index).array().scalar(); + for (int i = 0; i < list.length; i++) { + int j = i+1; + arrayWriter.setString(list[i]); + } + } + + private void writeFloat8Column(TupleWriter rowWriter, String name, double value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setDouble(value); + } + + private void writeFloat8ListColumn(TupleWriter rowWriter, String name, double[] list){ + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED); + index = rowWriter.addColumn(colSchema); + } + + ScalarWriter arrayWriter = rowWriter.column(index).array().scalar(); + for (int i = 0; i < list.length; i++) { + int j = i+1; + arrayWriter.setDouble(list[i]); + } + } + + private void writeFloat4Column(TupleWriter rowWriter, String name, float value) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setDouble(value); + } + + private void writeFloat4ListColumn(TupleWriter rowWriter, String name, float[] list){ + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED); + index = rowWriter.addColumn(colSchema); + } + + ScalarWriter arrayWriter = rowWriter.column(index).array().scalar(); + for (int i = 0; i < list.length; i++) { + int j = i+1; + arrayWriter.setDouble(list[i]); + } + } + + private void writeTimestampColumn(TupleWriter rowWriter, String name, long timestamp) { + Instant ts = new Instant(timestamp); + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setTimestamp(ts); + } + + private void writeTimestampColumn(TupleWriter rowWriter, String name, Instant ts) { + int index = rowWriter.tupleSchema().index(name); + if (index == -1) { + ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL); + index = rowWriter.addColumn(colSchema); + } + ScalarWriter colWriter = rowWriter.scalar(index); + colWriter.setTimestamp(ts); + } + + + private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata record) { + HashMap attribs = getAttributes(this.HDF5reader, record.getPath()); + Iterator entries = attribs.entrySet().iterator(); + + int index = rowWriter.tupleSchema().index("attributes"); + if (index == -1) { + index = rowWriter.addColumn(SchemaBuilder.columnSchema("attributes", TypeProtos.MinorType.MAP, TypeProtos.DataMode.OPTIONAL)); + } + TupleWriter mapWriter = rowWriter.tuple(index); + + while (entries.hasNext()) { + Map.Entry entry = (Map.Entry) entries.next(); + String key = (String) entry.getKey(); + + HDF5Attribute attrib = (HDF5Attribute) entry.getValue(); + switch (attrib.getDataType()) { + case BIT: + boolean value = (Boolean) attrib.getValue(); + writeBooleanColumn(mapWriter, key, value); + break; + case BIGINT: + writeLongColumn(mapWriter, key, (Long) attrib.getValue()); + break; + case INT: + writeIntColumn(mapWriter, key, (Integer) attrib.getValue()); + break; + case FLOAT8: + writeFloat8Column(mapWriter, key, (Double) attrib.getValue()); + break; + case FLOAT4: + writeFloat4Column(mapWriter, key, (Float) attrib.getValue()); + break; + case VARCHAR: + writeStringColumn(mapWriter, key, (String) attrib.getValue()); + break; + case TIMESTAMP: + writeTimestampColumn(mapWriter, key, (Long) attrib.getValue()); + case GENERIC_OBJECT: + //This is the case for HDF5 enums + String enumText = attrib.getValue().toString(); + writeStringColumn(mapWriter, key, enumText); + break; + } + } + } + + /** + * This function processes the MAP data type which can be found in HDF5 files. + * It automatically flattens anything greater than 2 dimensions. + * + * @param path + * @param fieldNames + * @param reader + * @param rowWriter + */ + + public void getAndMapCompoundData(String path, + List<String> fieldNames, + IHDF5Reader reader, + RowSetLoader rowWriter) { + + final String COMPOUND_DATA_FIELD_NAME = "compound_data"; + try { + path = HDF5Utils.resolvePath(path, reader); + Class<?> dataClass = HDF5Utils.getDatasetClass(path, reader); + if (dataClass == Map.class) { + if (fieldNames != null) { + HDF5CompoundMemberInformation[] infos = reader.compounds().getDataSetInfo(path); + for (HDF5CompoundMemberInformation info : infos) { + fieldNames.add(info.getName()); + } + } + + Object[][] values = reader.compounds().readArray(path, Object[].class); + String currentFieldName; + + // Case for auto-flatten + if (readerConfig.defaultPath != null) { + for (int row = 0; row < values.length; row++) { + rowWriter.start(); + for (int col = 0; col < values[row].length; col++) { + currentFieldName = fieldNames.get(col); + + if (values[row][col] instanceof Integer) { + writeIntColumn(rowWriter, currentFieldName, ((Integer) values[row][col]).intValue()); + } else if (values[row][col] instanceof Short) { + writeIntColumn(rowWriter, currentFieldName, ((Short) values[row][col]).intValue()); + } else if (values[row][col] instanceof Byte) { + writeIntColumn(rowWriter, currentFieldName,((Byte) values[row][col]).intValue() ); + } else if (values[row][col] instanceof Long) { + writeLongColumn(rowWriter, currentFieldName, ((Long) values[row][col]).longValue()); + } else if (values[row][col] instanceof Float) { + writeFloat4Column(rowWriter, currentFieldName, ((Float) values[row][col]).floatValue()); + } else if (values[row][col] instanceof Double) { + writeFloat8Column(rowWriter, currentFieldName,((Double) values[row][col]).doubleValue() ); + } else if (values[row][col] instanceof BitSet || values[row][col] instanceof Boolean) { + writeBooleanColumn(rowWriter, currentFieldName, ((Integer) values[row][col]).intValue()); + } else if (values[row][col] instanceof String) { + String stringValue = (String) values[row][col]; + writeStringColumn(rowWriter, currentFieldName, stringValue); + } + } + rowWriter.save(); + } + } else { Review comment: This method is also a bit too large to follow. Suggestion: break it into three: one for the upper case, one for the case below, and a third to call one or the other of those based on the proper condition. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services