This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 55f68ce  Adding ORC Multi-value Column Support (#4009)
55f68ce is described below

commit 55f68ce0311cc28ee0fa29ef15f44b5526ed5b79
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Mar 25 20:14:24 2019 -0700

    Adding ORC Multi-value Column Support (#4009)
---
 .../pinot/orc/data/readers/ORCRecordReader.java    | 26 ++++++---
 .../orc/data/readers/ORCRecordReaderTest.java      | 62 +++++++++++++++++++++-
 2 files changed, 81 insertions(+), 7 deletions(-)

diff --git 
a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
 
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
index 0c1b2db..11f8743 100644
--- 
a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
+++ 
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
@@ -19,13 +19,15 @@ package org.apache.pinot.orc.data.readers;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -137,7 +139,8 @@ public class ORCRecordReader implements RecordReader {
         // ORC will keep your columns in the same order as the schema provided
         ColumnVector vector = rowBatch.cols[i];
         // Previous value set to null, not used except to save allocation 
memory in OrcMapredRecordReader
-        WritableComparable writableComparable = 
OrcMapredRecordReader.nextValue(vector, 0, currColumn, null);
+        WritableComparable writableComparable;
+        writableComparable = OrcMapredRecordReader.nextValue(vector, 0, 
currColumn, null);
         genericRow.putField(currColumnName, getBaseObject(writableComparable));
       }
     } else {
@@ -159,8 +162,6 @@ public class ORCRecordReader implements RecordReader {
       obj = null;
     } else if (BooleanWritable.class.isAssignableFrom(w.getClass())) {
       obj = ((BooleanWritable) w).get();
-    } else if (ByteWritable.class.isAssignableFrom(w.getClass())) {
-      obj = ((ByteWritable) w).get();
     } else if (ShortWritable.class.isAssignableFrom(w.getClass())) {
       obj = ((ShortWritable) w).get();
     } else if (IntWritable.class.isAssignableFrom(w.getClass())) {
@@ -176,8 +177,7 @@ public class ORCRecordReader implements RecordReader {
     } else if (Text.class.isAssignableFrom(w.getClass())) {
       obj = ((Text) w).toString();
     } else if (OrcList.class.isAssignableFrom(w.getClass())) {
-      // TODO: This is probably multivalue columns
-      LOGGER.info("Skipping unsupported type: list");
+      obj = translateList((OrcList) w);
     } else {
       LOGGER.info("Unknown type found: " + w.getClass().getSimpleName());
       throw new IllegalArgumentException("Unknown type: " + 
w.getClass().getSimpleName());
@@ -186,6 +186,20 @@ public class ORCRecordReader implements RecordReader {
     return obj;
   }
 
+  private List<Object> translateList(OrcList<? extends WritableComparable> l) {
+    if (l == null || l.isEmpty()) {
+      return ImmutableList.of();
+    }
+
+    List<Object> retArray = new ArrayList<>(l.size());
+
+    for (WritableComparable w : l) {
+      Object o = getBaseObject(w);
+      retArray.add(o);
+    }
+    return retArray;
+  }
+
   @Override
   public void rewind()
       throws IOException {
diff --git 
a/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
 
b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
index c96e55d..b3282ed 100644
--- 
a/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
+++ 
b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
@@ -30,9 +30,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMapredRecordWriter;
+import org.apache.orc.mapred.OrcStruct;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
@@ -47,11 +52,13 @@ import org.testng.annotations.Test;
 public class ORCRecordReaderTest {
   private static final File TEMP_DIR = FileUtils.getTempDirectory();
   private static final File ORC_FILE = new File(TEMP_DIR.getAbsolutePath(), 
"my-file.orc");
+  private static final File MULTIVALUE_ORC_FILE = new 
File(TEMP_DIR.getAbsolutePath(), "mv-my-file.orc");
 
   @BeforeClass
   public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(TEMP_DIR);
+
     TypeDescription schema =
         TypeDescription.fromString("struct<x:int,y:string>");
 
@@ -77,12 +84,34 @@ public class ORCRecordReaderTest {
       writer.addRowBatch(batch);
     }
     writer.close();
+
+    // Define the mv orc schema - TypeDescription
+    TypeDescription orcTypeDesc = TypeDescription.createStruct();
+    TypeDescription typeEmails = 
TypeDescription.createList(TypeDescription.createString());
+
+    orcTypeDesc.addField("emails", typeEmails);
+    orcTypeDesc.addField("x", TypeDescription.createInt());
+
+    OrcList<Text> emails = new OrcList<>(typeEmails);
+    emails.add(new Text("hello"));
+    emails.add(new Text("no"));
+
+    OrcStruct struct = new OrcStruct(orcTypeDesc);
+    struct.setFieldValue("emails", emails);
+    struct.setFieldValue("x", new IntWritable(1));
+
+    Writer mvWriter = OrcFile.createWriter(new 
Path(MULTIVALUE_ORC_FILE.getAbsolutePath()),
+        OrcFile.writerOptions(new Configuration())
+            .setSchema(orcTypeDesc));
+
+    OrcMapredRecordWriter mrRecordWriter = new OrcMapredRecordWriter(mvWriter);
+    mrRecordWriter.write(null, struct);
+    mrRecordWriter.close(null);
   }
 
   @Test
   public void testReadData()
       throws IOException {
-
     ORCRecordReader orcRecordReader = new ORCRecordReader();
 
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig();
@@ -108,6 +137,37 @@ public class ORCRecordReaderTest {
     }
   }
 
+  @Test
+  public void testReadMVData() throws IOException{
+    ORCRecordReader orcRecordReader = new ORCRecordReader();
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig();
+    
segmentGeneratorConfig.setInputFilePath(MULTIVALUE_ORC_FILE.getAbsolutePath());
+    Schema schema = new Schema();
+    FieldSpec emailsFieldSpec = new DimensionFieldSpec("emails", 
FieldSpec.DataType.STRING, false);
+    schema.addField(emailsFieldSpec);
+    FieldSpec xFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT, 
true);
+    schema.addField(xFieldSpec);
+    segmentGeneratorConfig.setSchema(schema);
+    orcRecordReader.init(segmentGeneratorConfig);
+
+    List<GenericRow> genericRows = new ArrayList<>();
+    while (orcRecordReader.hasNext()) {
+      genericRows.add(orcRecordReader.next());
+    }
+    orcRecordReader.close();
+
+    Assert.assertEquals(genericRows.size(), 1, "Generic row size must be 1");
+
+    List<Integer> l = (List) genericRows.get(0).getValue("emails");
+    Assert.assertTrue(l.size() == 2);
+    Assert.assertEquals(l.get(0), "hello");
+    Assert.assertEquals(l.get(1), "no");
+
+    int val = (Integer) genericRows.get(0).getValue("x");
+    Assert.assertTrue(val == 1);
+  }
+
   @AfterClass
   public void tearDown() {
     FileUtils.deleteQuietly(TEMP_DIR);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to