This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aefcef  DRILL-7885: HDF5 Plugin not resolving links (#2190)
4aefcef is described below

commit 4aefcef2b665c5737471664912a26ef6ed9a6cfc
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Mar 22 00:19:23 2021 -0400

    DRILL-7885: HDF5 Plugin not resolving links (#2190)
    
    * Initial Commit
    
    * Addressed Review Comment
---
 .../drill/exec/store/hdf5/HDF5BatchReader.java     | 97 ++++++++++++++--------
 .../drill/exec/store/hdf5/HDF5DrillMetadata.java   | 10 +++
 .../drill/exec/store/hdf5/TestHDF5Format.java      |  5 +-
 3 files changed, 74 insertions(+), 38 deletions(-)

diff --git 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
index 9d2a7ef..6305ddc 100644
--- 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
+++ 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
@@ -24,6 +24,7 @@ import io.jhdf.api.Dataset;
 import io.jhdf.api.Group;
 import io.jhdf.api.Node;
 import io.jhdf.exceptions.HdfException;
+import io.jhdf.links.SoftLink;
 import io.jhdf.object.datatype.CompoundDataType;
 import io.jhdf.object.datatype.CompoundDataType.CompoundDataMember;
 import org.apache.drill.common.AutoCloseables;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 
 import org.apache.hadoop.mapred.FileSplit;
+import org.jvnet.libpam.impl.CLibrary.group;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +81,8 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
 
   private static final String DATA_TYPE_COLUMN_NAME = "data_type";
 
+  private static final String IS_LINK_COLUMN_NAME = "is_link";
+
   private static final String FILE_NAME_COLUMN_NAME = "file_name";
 
   private static final String INT_COLUMN_PREFIX = "int_col_";
@@ -107,6 +111,8 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
 
   private static final int PREVIEW_ROW_LIMIT = 20;
 
+  private static final int PREVIEW_COL_LIMIT = 100;
+
   private static final int MAX_DATASET_SIZE = ValueVector.MAX_BUFFER_SIZE;
 
   private final HDF5ReaderConfig readerConfig;
@@ -135,6 +141,8 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
 
   private ScalarWriter fileNameWriter;
 
+  private ScalarWriter linkWriter;
+
   private ScalarWriter dataSizeWriter;
 
   private ScalarWriter elementCountWriter;
@@ -192,13 +200,14 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
 
       // Schema for Metadata query
       SchemaBuilder builder = new SchemaBuilder()
-        .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
-        .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
-        .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
-        .addNullable(DATA_SIZE_COLUMN_NAME,  TypeProtos.MinorType.BIGINT )
-        .addNullable(ELEMENT_COUNT_NAME,  TypeProtos.MinorType.BIGINT)
-        .addNullable(DATASET_DATA_TYPE_NAME, TypeProtos.MinorType.VARCHAR)
-        .addNullable(DIMENSIONS_FIELD_NAME, TypeProtos.MinorType.VARCHAR);
+        .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+        .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+        .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+        .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+        .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+        .addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
+        .addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
+        .addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
 
       negotiator.tableSchema(builder.buildSchema(), false);
 
@@ -231,6 +240,7 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
       dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
       fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
       dataSizeWriter = rowWriter.scalar(DATA_SIZE_COLUMN_NAME);
+      linkWriter = rowWriter.scalar(IS_LINK_COLUMN_NAME);
       elementCountWriter = rowWriter.scalar(ELEMENT_COUNT_NAME);
       datasetTypeWriter = rowWriter.scalar(DATASET_DATA_TYPE_NAME);
       dimensionsWriter = rowWriter.scalar(DIMENSIONS_FIELD_NAME);
@@ -408,6 +418,7 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     pathWriter.setString(metadataRow.getPath());
     dataTypeWriter.setString(metadataRow.getDataType());
     fileNameWriter.setString(fileName);
+    linkWriter.setBoolean(metadataRow.isLink());
 
     //Write attributes if present
     if (metadataRow.getAttributes().size() > 0) {
@@ -422,7 +433,10 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
       datasetTypeWriter.setString(dataset.getJavaType().getName());
       dimensionsWriter.setString(Arrays.toString(dataset.getDimensions()));
 
-      projectDataset(rowWriter, metadataRow.getPath());
+      // Do not project links
+      if (! metadataRow.isLink()) {
+        projectDataset(rowWriter, metadataRow.getPath());
+      }
     }
     rowWriter.save();
   }
@@ -448,26 +462,30 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     }
     for (Node node : group) {
       HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
-      metadataRow.setPath(node.getPath());
-      metadataRow.setDataType(node.getType().name());
-
-      switch (node.getType()) {
-        case DATASET:
-          attribs = getAttributes(node.getPath());
-          metadataRow.setAttributes(attribs);
-          metadata.add(metadataRow);
-          break;
-        case GROUP:
-          attribs = getAttributes(node.getPath());
-          metadataRow.setAttributes(attribs);
-          metadata.add(metadataRow);
-          if (!node.isLink()) {
-            // Links don't have metadata
+
+      if (node.isLink()) {
+        SoftLink link = (SoftLink) node;
+        metadataRow.setPath(link.getTargetPath());
+        metadataRow.setLink(true);
+      } else {
+        metadataRow.setPath(node.getPath());
+        metadataRow.setDataType(node.getType().name());
+        metadataRow.setLink(false);
+        switch (node.getType()) {
+          case DATASET:
+            attribs = getAttributes(node.getPath());
+            metadataRow.setAttributes(attribs);
+            metadata.add(metadataRow);
+            break;
+          case GROUP:
+            attribs = getAttributes(node.getPath());
+            metadataRow.setAttributes(attribs);
+            metadata.add(metadataRow);
             getFileMetadata((Group) node, metadata);
-          }
-          break;
-        default:
-          logger.warn("Unknown data type: {}", node.getType());
+            break;
+          default:
+            logger.warn("Unknown data type: {}", node.getType());
+        }
       }
     }
     return metadata;
@@ -548,6 +566,12 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
         return;
       }
       assert currentDataType != null;
+
+      // Skip null datasets
+      if (data == null) {
+        return;
+      }
+
       switch (currentDataType) {
         case GENERIC_OBJECT:
           logger.warn("Couldn't read {}", datapath );
@@ -570,7 +594,7 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
           break;
         case TINYINT:
           byte[] byteList = (byte[])data;
-          writeByteColumn(rowWriter, fieldName, byteList);
+          writeByteListColumn(rowWriter, fieldName, byteList);
           break;
         case FLOAT4:
           float[] tempFloatList = (float[])data;
@@ -725,7 +749,7 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
    * @param name the name of the outer list
    * @param list the list of data
    */
-  private void writeByteColumn(TupleWriter rowWriter, String name, byte[] 
list) {
+  private void writeByteListColumn(TupleWriter rowWriter, String name, byte[] 
list) {
     int index = rowWriter.tupleSchema().index(name);
     if (index == -1) {
       ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
MinorType.TINYINT, TypeProtos.DataMode.REPEATED);
@@ -932,9 +956,10 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     ArrayWriter innerWriter = listWriter.array();
     // The strings within the inner array
     ScalarWriter floatWriter = innerWriter.scalar();
-    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+    int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
     for (int i = 0; i < maxElements; i++) {
-      for (int k = 0; k < cols; k++) {
+      for (int k = 0; k < maxCols; k++) {
         floatWriter.setDouble(colData[i][k]);
       }
       listWriter.save();
@@ -979,9 +1004,10 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     // The strings within the inner array
     ScalarWriter floatWriter = innerWriter.scalar();
 
-    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+    int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
     for (int i = 0; i < maxElements; i++) {
-      for (int k = 0; k < cols; k++) {
+      for (int k = 0; k < maxCols; k++) {
         floatWriter.setDouble(colData[i][k]);
       }
       listWriter.save();
@@ -1026,9 +1052,10 @@ public class HDF5BatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     // The strings within the inner array
     ScalarWriter bigintWriter = innerWriter.scalar();
 
-    int maxElements = Math.min(rows, PREVIEW_ROW_LIMIT);
+    int maxElements = Math.min(colData.length, PREVIEW_ROW_LIMIT);
+    int maxCols = Math.min(colData[0].length, PREVIEW_COL_LIMIT);
     for (int i = 0; i < maxElements; i++) {
-      for (int k = 0; k < cols; k++) {
+      for (int k = 0; k < maxCols; k++) {
         bigintWriter.setLong(colData[i][k]);
       }
       listWriter.save();
diff --git 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
index 557214b..f779e80 100644
--- 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
+++ 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5DrillMetadata.java
@@ -28,6 +28,8 @@ public class HDF5DrillMetadata {
 
   private Map<String, HDF5Attribute> attributes;
 
+  private boolean isLink;
+
   public HDF5DrillMetadata() {
     attributes = new HashMap<>();
   }
@@ -58,4 +60,12 @@ public class HDF5DrillMetadata {
   public void setAttributes(Map<String, HDF5Attribute> attribs) {
     this.attributes = attribs;
   }
+
+  public boolean isLink() {
+    return isLink;
+  }
+
+  public void setLink(boolean linkStatus) {
+    isLink = linkStatus;
+  }
 }
diff --git 
a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
 
b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
index c6e8e52..48e6d80 100644
--- 
a/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
+++ 
b/contrib/format-hdf5/src/test/java/org/apache/drill/exec/store/hdf5/TestHDF5Format.java
@@ -86,8 +86,8 @@ public class TestHDF5Format extends ClusterTest {
     testBuilder()
       .sqlQuery("SELECT * FROM dfs.`hdf5/dset.h5`")
       .unOrdered()
-      .baselineColumns("path", "data_type", "file_name", "data_size", 
"element_count", "dataset_data_type", "dimensions", "int_data")
-      .baselineValues("/dset", "DATASET", "dset.h5", 96L, 24L, "int", "[4, 
6]", finalList)
+      .baselineColumns("path", "data_type", "file_name", "data_size", 
"element_count", "dataset_data_type", "dimensions", "int_data", "is_link")
+      .baselineValues("/dset", "DATASET", "dset.h5", 96L, 24L, "int", "[4, 
6]", finalList, false)
       .go();
   }
 
@@ -413,7 +413,6 @@ public class TestHDF5Format extends ClusterTest {
     new RowSetComparison(expected).unorderedVerifyAndClearAll(results);
   }
 
-
   @Test
   public void testUnicodeScalarQuery() throws Exception {
     String sql = "SELECT flatten(unicode) AS string_col\n" +

Reply via email to