This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 55f68ce Adding ORC Multi-value Column Support (#4009)
55f68ce is described below
commit 55f68ce0311cc28ee0fa29ef15f44b5526ed5b79
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Mar 25 20:14:24 2019 -0700
Adding ORC Multi-value Column Support (#4009)
---
.../pinot/orc/data/readers/ORCRecordReader.java | 26 ++++++---
.../orc/data/readers/ORCRecordReaderTest.java | 62 +++++++++++++++++++++-
2 files changed, 81 insertions(+), 7 deletions(-)
diff --git
a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
index 0c1b2db..11f8743 100644
---
a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
+++
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
@@ -19,13 +19,15 @@ package org.apache.pinot.orc.data.readers;
* under the License.
*/
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -137,7 +139,8 @@ public class ORCRecordReader implements RecordReader {
// ORC will keep your columns in the same order as the schema provided
ColumnVector vector = rowBatch.cols[i];
// Previous value set to null, not used except to save allocation
memory in OrcMapredRecordReader
- WritableComparable writableComparable =
OrcMapredRecordReader.nextValue(vector, 0, currColumn, null);
+ WritableComparable writableComparable;
+ writableComparable = OrcMapredRecordReader.nextValue(vector, 0,
currColumn, null);
genericRow.putField(currColumnName, getBaseObject(writableComparable));
}
} else {
@@ -159,8 +162,6 @@ public class ORCRecordReader implements RecordReader {
obj = null;
} else if (BooleanWritable.class.isAssignableFrom(w.getClass())) {
obj = ((BooleanWritable) w).get();
- } else if (ByteWritable.class.isAssignableFrom(w.getClass())) {
- obj = ((ByteWritable) w).get();
} else if (ShortWritable.class.isAssignableFrom(w.getClass())) {
obj = ((ShortWritable) w).get();
} else if (IntWritable.class.isAssignableFrom(w.getClass())) {
@@ -176,8 +177,7 @@ public class ORCRecordReader implements RecordReader {
} else if (Text.class.isAssignableFrom(w.getClass())) {
obj = ((Text) w).toString();
} else if (OrcList.class.isAssignableFrom(w.getClass())) {
- // TODO: This is probably multivalue columns
- LOGGER.info("Skipping unsupported type: list");
+ obj = translateList((OrcList) w);
} else {
LOGGER.info("Unknown type found: " + w.getClass().getSimpleName());
throw new IllegalArgumentException("Unknown type: " +
w.getClass().getSimpleName());
@@ -186,6 +186,20 @@ public class ORCRecordReader implements RecordReader {
return obj;
}
+ private List<Object> translateList(OrcList<? extends WritableComparable> l) {
+ if (l == null || l.isEmpty()) {
+ return ImmutableList.of();
+ }
+
+ List<Object> retArray = new ArrayList<>(l.size());
+
+ for (WritableComparable w : l) {
+ Object o = getBaseObject(w);
+ retArray.add(o);
+ }
+ return retArray;
+ }
+
@Override
public void rewind()
throws IOException {
diff --git
a/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
index c96e55d..b3282ed 100644
---
a/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
+++
b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
@@ -30,9 +30,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMapredRecordWriter;
+import org.apache.orc.mapred.OrcStruct;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
@@ -47,11 +52,13 @@ import org.testng.annotations.Test;
public class ORCRecordReaderTest {
private static final File TEMP_DIR = FileUtils.getTempDirectory();
private static final File ORC_FILE = new File(TEMP_DIR.getAbsolutePath(),
"my-file.orc");
+ private static final File MULTIVALUE_ORC_FILE = new
File(TEMP_DIR.getAbsolutePath(), "mv-my-file.orc");
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteQuietly(TEMP_DIR);
+
TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
@@ -77,12 +84,34 @@ public class ORCRecordReaderTest {
writer.addRowBatch(batch);
}
writer.close();
+
+ // Define the mv orc schema - TypeDescription
+ TypeDescription orcTypeDesc = TypeDescription.createStruct();
+ TypeDescription typeEmails =
TypeDescription.createList(TypeDescription.createString());
+
+ orcTypeDesc.addField("emails", typeEmails);
+ orcTypeDesc.addField("x", TypeDescription.createInt());
+
+ OrcList<Text> emails = new OrcList<>(typeEmails);
+ emails.add(new Text("hello"));
+ emails.add(new Text("no"));
+
+ OrcStruct struct = new OrcStruct(orcTypeDesc);
+ struct.setFieldValue("emails", emails);
+ struct.setFieldValue("x", new IntWritable(1));
+
+ Writer mvWriter = OrcFile.createWriter(new
Path(MULTIVALUE_ORC_FILE.getAbsolutePath()),
+ OrcFile.writerOptions(new Configuration())
+ .setSchema(orcTypeDesc));
+
+ OrcMapredRecordWriter mrRecordWriter = new OrcMapredRecordWriter(mvWriter);
+ mrRecordWriter.write(null, struct);
+ mrRecordWriter.close(null);
}
@Test
public void testReadData()
throws IOException {
-
ORCRecordReader orcRecordReader = new ORCRecordReader();
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig();
@@ -108,6 +137,37 @@ public class ORCRecordReaderTest {
}
}
+ @Test
+ public void testReadMVData() throws IOException{
+ ORCRecordReader orcRecordReader = new ORCRecordReader();
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig();
+
segmentGeneratorConfig.setInputFilePath(MULTIVALUE_ORC_FILE.getAbsolutePath());
+ Schema schema = new Schema();
+ FieldSpec emailsFieldSpec = new DimensionFieldSpec("emails",
FieldSpec.DataType.STRING, false);
+ schema.addField(emailsFieldSpec);
+ FieldSpec xFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.INT,
true);
+ schema.addField(xFieldSpec);
+ segmentGeneratorConfig.setSchema(schema);
+ orcRecordReader.init(segmentGeneratorConfig);
+
+ List<GenericRow> genericRows = new ArrayList<>();
+ while (orcRecordReader.hasNext()) {
+ genericRows.add(orcRecordReader.next());
+ }
+ orcRecordReader.close();
+
+ Assert.assertEquals(genericRows.size(), 1, "Generic row size must be 1");
+
+ List<Integer> l = (List) genericRows.get(0).getValue("emails");
+ Assert.assertTrue(l.size() == 2);
+ Assert.assertEquals(l.get(0), "hello");
+ Assert.assertEquals(l.get(1), "no");
+
+ int val = (Integer) genericRows.get(0).getValue("x");
+ Assert.assertTrue(val == 1);
+ }
+
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(TEMP_DIR);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]