This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 4aefcef DRILL-7885: HDF5 Plugin not resolving links (#2190)
4aefcef is described below
commit 4aefcef2b665c5737471664912a26ef6ed9a6cfc
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Mar 22 00:19:23 2021 -0400
DRILL-7885: HDF5 Plugin not resolving links (#2190)
* Initial Commit
* Addressed Review Comment
---
.../drill/exec/store/hdf5/HDF5BatchReader.java | 97 ++++++++++++++--------
.../drill/exec/store/hdf5/HDF5DrillMetadata.java | 10 +++
.../drill/exec/store/hdf5/TestHDF5Format.java | 5 +-
3 files changed, 74 insertions(+), 38 deletions(-)
diff --git
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 9d2a7ef..6305ddc 100644
---
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -24,6 +24,7 @@ import io.jhdf.api.Dataset;
import io.jhdf.api.Group;
import io.jhdf.api.Node;
import io.jhdf.exceptions.HdfException;
+import io.jhdf.links.SoftLink;
import io.jhdf.object.datatype.CompoundDataType;
import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
import org.apache.drill.common.AutoCloseables;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.hadoop.mapred.FileSplit;
+import org.jvnet.libpam.impl.CLibrary.group;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +81,8 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
private static final String DATA_TYPE_COLUMN_NAME = "data_type";
+ private static final String IS_LINK_COLUMN_NAME = "is_link";
+
private static final String FILE_NAME_COLUMN_NAME = "file_name";
private static final String INT_COLUMN_PREFIX = "int_col_";
@@ -107,6 +111,8 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
private static final int PREVIEW_ROW_LIMIT = 20;
+ private static final int PREVIEW_COL_LIMIT = 100;
+
private static final int MAX_DATASET_SIZE = ValueVector.MAX_BUFFER_SIZE;
private final HDF5ReaderConfig readerConfig;
@@ -135,6 +141,8 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
private ScalarWriter fileNameWriter;
+ private ScalarWriter linkWriter;
+
private ScalarWriter dataSizeWriter;
private ScalarWriter elementCountWriter;
@@ -192,13 +200,14 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
// Schema for Metadata query
SchemaBuilder builder = new SchemaBuilder()
- .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
- .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
- .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
- .addNullable(DATA_SIZE_COLUMN_NAME, TypeProtos.MinorType.BIGINT )
- .addNullable(ELEMENT_COUNT_NAME, TypeProtos.MinorType.BIGINT)
- .addNullable(DATASET_DATA_TYPE_NAME, TypeProtos.MinorType.VARCHAR)
- .addNullable(DIMENSIONS_FIELD_NAME, TypeProtos.MinorType.VARCHAR);
+ .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+ .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+ .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+ .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+ .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+ .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
negotiator.tableSchema(builder.buildSchema(), false);
@@ -231,6 +240,7 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+ linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
@@ -408,6 +418,7 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
pathWriter.setString(metadataRow.getPath());
dataTypeWriter.setString(metadataRow.getDataType());
fileNameWriter.setString(fileName);
+ linkWriter.setBoolean(metadataRow.isLink());
//Write attributes if present
if (metadataRow.getAttributes().size() > 0) {
@@ -422,7 +433,10 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
datasetTypeWriter.setString(dataset.getJavaType().getName());
dimensionsWriter.setString(Arrays.toString(dataset.getDimensions()));
- projectDataset(rowWriter, metadataRow.getPath());
+ // Do not project links
+ if (! metadataRow.isLink()) {
+ projectDataset(rowWriter, metadataRow.getPath());
+ }
}
rowWriter.save();
}
@@ -448,26 +462,30 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
}
for (Node node : group) {
HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
- metadataRow.setPath(node.getPath());
- metadataRow.setDataType(node.getType().name());
-
- switch (node.getType()) {
- case DATASET:
- attribs = getAttributes(node.getPath());
- metadataRow.setAttributes(attribs);
- metadata.add(metadataRow);
- break;
- case GROUP:
- attribs = getAttributes(node.getPath());
- metadataRow.setAttributes(attribs);
- metadata.add(metadataRow);
- if (!node.isLink()) {
- // Links don't have metadata
+
+ if (node.isLink()) {
+ SoftLink link = (SoftLink) node;
+ metadataRow.setPath(link.getTargetPath());
+ metadataRow.setLink(true);
+ } else {
+ metadataRow.setPath(node.getPath());
+ metadataRow.setDataType(node.getType().name());
+ metadataRow.setLink(false);
+ switch (node.getType()) {
+ case DATASET:
+ attribs = getAttributes(node.getPath());
+ metadataRow.setAttributes(attribs);
+ metadata.add(metadataRow);
+ break;
+ case GROUP:
+ attribs = getAttributes(node.getPath());
+ metadataRow.setAttributes(attribs);
+ metadata.add(metadataRow);
getFileMetadata((Group) node, metadata);
- }
- break;
- default:
- logger.warn("Unknown data type: {}", node.getType());
+ break;
+ default:
+ logger.warn("Unknown data type: {}", node.getType());
+ }
}
}
return metadata;
@@ -548,6 +566,12 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
return;
}
assert currentDataType != null;
+
+ // Skip null datasets
+ if (data == null) {
+ return;
+ }
+
switch (currentDataType) {
case GENERIC_OBJECT:
logger.warn("Couldn't read {}", datapath );
@@ -570,7 +594,7 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
break;
case TINYINT:
byte[] byteList = (byte[])data;
- writeByteColumn(rowWriter, fieldName, byteList);
+ writeByteListColumn(rowWriter, fieldName, byteList);
break;
case FLOAT4:
float[] tempFloatList = (float[])data;
@@ -725,7 +749,7 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
* @param name the name of the outer list
* @param list the list of data
*/
- private void writeByteColumn(TupleWriter rowWriter, String name, byte[]
list) {
+ private void writeByteListColumn(TupleWriter rowWriter, String name, byte[]
list) {
int index = rowWriter.tupleSchema().index(name);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(name,
MinorType.TINYINT, TypeProtos.DataMode.REPEATED);
@@ -932,9 +956,10 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
ArrayWriter innerWriter = listWriter.array();
// The strings within the inner array
ScalarWriter floatWriter = innerWriter.scalar();
- int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+ int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+ int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
for (int i = 0; i < maxElements; i++) {
- for (int k = 0; k < cols; k++) {
+ for (int k = 0; k < maxCols; k++) {
floatWriter.setDouble(colData[i][k]);
}
listWriter.save();
@@ -979,9 +1004,10 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
// The strings within the inner array
ScalarWriter floatWriter = innerWriter.scalar();
- int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+ int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+ int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
for (int i = 0; i < maxElements; i++) {
- for (int k = 0; k < cols; k++) {
+ for (int k = 0; k < maxCols; k++) {
floatWriter.setDouble(colData[i][k]);
}
listWriter.save();
@@ -1026,9 +1052,10 @@ public class HDF5BatchReader implements
ManagedReader<FileSchemaNegotiator> {
// The strings within the inner array
ScalarWriter bigintWriter = innerWriter.scalar();
- int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+ int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+ int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
for (int i = 0; i < maxElements; i++) {
- for (int k = 0; k < cols; k++) {
+ for (int k = 0; k < maxCols; k++) {
bigintWriter.setLong(colData[i][k]);
}
listWriter.save();
diff --git
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
index 557214b..f779e80 100644
---
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
+++
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
@@ -28,6 +28,8 @@ public class HDF5DrillMetadata {
private Map<String, HDF5Attribute> attributes;
+ private boolean isLink;
+
public HDF5DrillMetadata() {
attributes = new HashMap<>();
}
@@ -58,4 +60,12 @@ public class HDF5DrillMetadata {
public void setAttributes(Map<String, HDF5Attribute> attribs) {
this.attributes = attribs;
}
+
+ public boolean isLink() {
+ return isLink;
+ }
+
+ public void setLink(boolean linkStatus) {
+ isLink = linkStatus;
+ }
}
diff --git
a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
index c6e8e52..48e6d80 100644
---
a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
+++
b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
@@ -86,8 +86,8 @@ public class TestHDF5Format extends ClusterTest {
testBuilder()
.sqlQuery("SELECT * FROM dfs.`hdf5/dset.h5`")
.unOrdered()
- .baselineColumns("path", "data_type", "file_name", "data_size",
"element_count", "dataset_data_type", "dimensions", "int_data")
- .baselineValues("/dset", "DATASET", "dset.h5", 96L, 24L, "int", "[4,
6]", finalList)
+ .baselineColumns("path", "data_type", "file_name", "data_size",
"element_count", "dataset_data_type", "dimensions", "int_data", "is_link")
+ .baselineValues("/dset", "DATASET", "dset.h5", 96L, 24L, "int", "[4,
6]", finalList, false)
.go();
}
@@ -413,7 +413,6 @@ public class TestHDF5Format extends ClusterTest {
new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
}
-
@Test
public void testUnicodeScalarQuery() throws Exception {
String sql = "SELECT flatten(unicode) AS string_col\n" +