Repository: parquet-mr
Updated Branches:
  refs/heads/master b86f68e39 -> 2f956f465


PARQUET-341 improve write performance for wide schema sparse data

In write path, when there are tons of sparse data, most of time is spent on 
writing nulls.
Currently writing nulls has the same code path as writing values, which is 
reclusive traverse all the leaves when a group is null.
Due to the fact that when a group is null all the leaves beneath it should be 
written with null value with the same repetition level and definition level, we 
can eliminate the recursion call to get the leaves

This PR caches the leaves for each group node. So when a group node is null, 
their leaves can be flushed with null values directly.

We tested it with a really wide schema on one of our production data. It 
improves the performance by ~20%

Author: Tianshuo Deng <[email protected]>

Closes #247 from tsdeng/flush_null_directly and squashes the following commits:

253f2e3 [Tianshuo Deng] address comments
8676cd7 [Tianshuo Deng] flush null directly to leaves


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/2f956f46
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/2f956f46
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/2f956f46

Branch: refs/heads/master
Commit: 2f956f46580e5b4752173e885d37a20fe31a78d8
Parents: b86f68e
Author: Tianshuo Deng <[email protected]>
Authored: Wed Aug 5 16:29:00 2015 -0700
Committer: Tianshuo Deng <[email protected]>
Committed: Wed Aug 5 16:29:00 2015 -0700

----------------------------------------------------------------------
 .../org/apache/parquet/io/MessageColumnIO.java  | 39 +++++++++++++++++---
 .../hadoop/InternalParquetRecordReader.java     |  3 +-
 2 files changed, 36 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/2f956f46/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java 
b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index 9a8f88e..048dcc3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -18,9 +18,12 @@
  */
 package org.apache.parquet.io;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnWriteStore;
@@ -172,16 +175,39 @@ public class MessageColumnIO extends GroupColumnIO {
     private final FieldsMarker[] fieldsWritten;
     private final int[] r;
     private final ColumnWriter[] columnWriter;
+    /** maintain a map of a group and all the leaf nodes underneath it. It's 
used to optimize writing null for a group node
+     * all the leaves can be called directly without traversing the sub tree 
of the group node */
+    private Map<GroupColumnIO, List<ColumnWriter>>  groupToLeafWriter = new 
HashMap<GroupColumnIO, List<ColumnWriter>>();
     private final ColumnWriteStore columns;
     private boolean emptyField = true;
 
+    private void buildGroupToLeafWriterMap(PrimitiveColumnIO primitive, 
ColumnWriter writer) {
+      GroupColumnIO  parent = primitive.getParent();
+      do {
+        getLeafWriters(parent).add(writer);
+        parent = parent.getParent();
+      } while (parent != null);
+    }
+
+    private List<ColumnWriter> getLeafWriters(GroupColumnIO group) {
+      List<ColumnWriter> writers = groupToLeafWriter.get(group);
+      if (writers == null) {
+        writers = new ArrayList<ColumnWriter>();
+        groupToLeafWriter.put(group, writers);
+      }
+      return writers;
+    }
+
     public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
       this.columns = columns;
       int maxDepth = 0;
       this.columnWriter = new 
ColumnWriter[MessageColumnIO.this.getLeaves().size()];
+
       for (PrimitiveColumnIO primitiveColumnIO : 
MessageColumnIO.this.getLeaves()) {
+        ColumnWriter w = 
columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
         maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length);
-        columnWriter[primitiveColumnIO.getId()] = 
columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
+        columnWriter[primitiveColumnIO.getId()] = w;
+        buildGroupToLeafWriterMap(primitiveColumnIO, w);
       }
 
       fieldsWritten = new FieldsMarker[maxDepth];
@@ -271,10 +297,13 @@ public class MessageColumnIO extends GroupColumnIO {
         columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, 
d);
       } else {
         GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
-        int childrenCount = groupColumnIO.getChildrenCount();
-        for (int i = 0; i < childrenCount; i++) {
-          writeNull(groupColumnIO.getChild(i), r, d);
-        }
+        writeNullToLeaves(groupColumnIO, r, d);
+      }
+    }
+
+    private void writeNullToLeaves(GroupColumnIO group, int r, int d) {
+      for(ColumnWriter leafWriter: groupToLeafWriter.get(group)) {
+        leafWriter.writeNull(r,d);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/2f956f46/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index c1bd037..21e69b7 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -62,6 +62,7 @@ class InternalParquetRecordReader<T> {
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
+  private MessageColumnIO columnIO;
   private int columnCount;
   private final ReadSupport<T> readSupport;
 
@@ -136,7 +137,6 @@ class InternalParquetRecordReader<T> {
       BenchmarkCounter.incrementTime(timeSpentReading);
       if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " 
ms. row count = " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested 
schema " + requestedSchema);
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, 
fileSchema, strictTypeChecking);
       recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
@@ -174,6 +174,7 @@ class InternalParquetRecordReader<T> {
     this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
     this.requestedSchema = readContext.getRequestedSchema();
     this.fileSchema = fileSchema;
+    this.columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, 
strictTypeChecking);
     this.file = file;
     this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(

Reply via email to