Repository: parquet-mr Updated Branches: refs/heads/master 3f36b7b50 -> 01fbf81e3
PARQUET-343 Caching nulls on group node to improve write performance on wide schema sparse data For really wide schema with sparse data, If a group node is empty, it could have a huge number of leaves underneath it. Calling writeMull for each leaf every time when it's ancestor group node is null is in-effcient and is bad for data locality in the memory especially when the number of leaves is huge. Instead, null can be cached on the group node. Flushing is only triggered when a group node becomes non-null from null. This way, all the cached null values will be flushed to the leaf nodes in a tight loop and improves write performance. We tested this approach combined with PARQUET-341 on a really large schema and gave us ~2X improvement on write performance Author: Tianshuo Deng <[email protected]> Closes #249 from tsdeng/batch_null and squashes the following commits: 0a61646 [Tianshuo Deng] use curly braces even for 1 line if statements a8964c0 [Tianshuo Deng] optimize writeNullToLeaves 5309612 [Tianshuo Deng] optimize cacheNullForGroup ecbdfca [Tianshuo Deng] add comments ed692c0 [Tianshuo Deng] WIP 0cae1b6 [Tianshuo Deng] remove unused class 8e07db4 [Tianshuo Deng] refactor dead618 [Tianshuo Deng] reformat c3c0c70 [Tianshuo Deng] refactor 636ab52 [Tianshuo Deng] remove unused method 767b4fd [Tianshuo Deng] use parent definition level 8f251a0 [Tianshuo Deng] use IntArrayList c549c84 [Tianshuo Deng] fix 9583d04 [Tianshuo Deng] wIP d8cb878 [Tianshuo Deng] WIP 35f1fa1 [Tianshuo Deng] cache columnWriter for each parent 46fd464 [Tianshuo Deng] address comments 8c83964 [Tianshuo Deng] flush null directly to leaves Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/01fbf81e Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/01fbf81e Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/01fbf81e Branch: refs/heads/master Commit: 01fbf81e34a36cedf505f20b1c52306afceedc3e Parents: 3f36b7b Author: Tianshuo Deng <[email protected]> Authored: Thu Aug 20 14:21:12 2015 -0700 Committer: Tianshuo Deng <[email protected]> Committed: Thu Aug 20 14:21:12 2015 -0700 ---------------------------------------------------------------------- .../org/apache/parquet/io/MessageColumnIO.java | 139 ++++++++++++++++--- .../parquet/io/ValidatingRecordConsumer.java | 4 + .../apache/parquet/io/api/RecordConsumer.java | 7 + .../org/apache/parquet/io/TestColumnIO.java | 21 ++- .../org/apache/parquet/io/TestFiltered.java | 5 +- .../hadoop/InternalParquetRecordReader.java | 3 +- .../hadoop/InternalParquetRecordWriter.java | 6 +- .../parquet/thrift/TestParquetReadProtocol.java | 1 + 8 files changed, 154 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java index 048dcc3..cb1c8d6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -30,6 +30,7 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.values.dictionary.IntList; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -46,6 +47,8 @@ import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntIterator; import static org.apache.parquet.Preconditions.checkNotNull; /** @@ -53,7 +56,6 @@ import static org.apache.parquet.Preconditions.checkNotNull; * * * @author Julien Le Dem - * */ public class MessageColumnIO extends GroupColumnIO { private static final Log logger = Log.getLog(MessageColumnIO.class); @@ -144,6 +146,38 @@ public class MessageColumnIO extends GroupColumnIO { }); } + /** + * To improve null writing performance, we cache null values on group nodes. We flush nulls when a + * non-null value hits the group node. + * + * Intuitively, when a group node hits a null value, all the leaves underneath it should be null. + * A direct way of doing it is to write nulls for all the leaves underneath it when a group node + * is null. This approach is not optimal, consider following case: + * + * - When the schema is really wide where for each group node, there are thousands of leaf + * nodes underneath it. + * - When the data being written is really sparse, group nodes could hit nulls frequently. + * + * With the direct approach, if a group node hit null values a thousand times, and there are a + * thousand nodes underneath it. + * For each null value, it iterates over a thousand leaf writers to write null values and it + * will do it for a thousand null values. + * + * In the above case, each leaf writer maintains it's own buffer of values, calling thousands of + * them in turn is very bad for memory locality. Instead each group node can remember the null values + * encountered and flush only when a non-null value hits the group node. In this way, when we flush + * null values, we only iterate through all the leaves 1 time and multiple cached null values are + * flushed to each leaf in a tight loop. This implementation has following characteristics. + * + * 1. When a group node hits a null value, it adds the repetition level of the null value to + * the groupNullCache. The definition level of the cached nulls should always be the same as + * the definition level of the group node so there is no need to store it. + * + * 2. When a group node hits a non null value and it has null value cached, it should flush null + * values and start from his children group nodes first. This make sure the order of null values + * being flushed is correct. + * + */ private class MessageColumnIORecordConsumer extends RecordConsumer { private ColumnIO currentColumnIO; private int currentLevel = 0; @@ -154,8 +188,8 @@ public class MessageColumnIO extends GroupColumnIO { @Override public String toString() { return "VistedIndex{" + - "vistedIndexes=" + vistedIndexes + - '}'; + "vistedIndexes=" + vistedIndexes + + '}'; } public void reset(int fieldsCount) { @@ -175,14 +209,24 @@ public class MessageColumnIO extends GroupColumnIO { private final FieldsMarker[] fieldsWritten; private final int[] r; private final ColumnWriter[] columnWriter; - /** maintain a map of a group and all the leaf nodes underneath it. It's used to optimize writing null for a group node - * all the leaves can be called directly without traversing the sub tree of the group node */ - private Map<GroupColumnIO, List<ColumnWriter>> groupToLeafWriter = new HashMap<GroupColumnIO, List<ColumnWriter>>(); + + /** + * Maintain a map of groups and all the leaf nodes underneath it. It's used to optimize writing null for a group node. + * Instead of using recursion calls, all the leaves can be called directly without traversing the sub tree of the group node + */ + private Map<GroupColumnIO, List<ColumnWriter>> groupToLeafWriter = new HashMap<GroupColumnIO, List<ColumnWriter>>(); + + + /* + * Cache nulls for each group node. It only stores the repetition level, since the definition level + * should always be the definition level of the group node. + */ + private Map<GroupColumnIO, IntArrayList> groupNullCache = new HashMap<GroupColumnIO, IntArrayList>(); private final ColumnWriteStore columns; private boolean emptyField = true; private void buildGroupToLeafWriterMap(PrimitiveColumnIO primitive, ColumnWriter writer) { - GroupColumnIO parent = primitive.getParent(); + GroupColumnIO parent = primitive.getParent(); do { getLeafWriters(parent).add(writer); parent = parent.getParent(); @@ -227,7 +271,7 @@ public class MessageColumnIO extends GroupColumnIO { private void log(Object m) { String indent = ""; - for (int i = 0; i<currentLevel; ++i) { + for (int i = 0; i < currentLevel; ++i) { indent += " "; } logger.debug(indent + m); @@ -238,7 +282,7 @@ public class MessageColumnIO extends GroupColumnIO { if (DEBUG) log("< MESSAGE START >"); currentColumnIO = MessageColumnIO.this; r[0] = 0; - int numberOfFieldsToVisit = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + int numberOfFieldsToVisit = ((GroupColumnIO) currentColumnIO).getChildrenCount(); fieldsWritten[0].reset(numberOfFieldsToVisit); if (DEBUG) printState(); } @@ -255,7 +299,7 @@ public class MessageColumnIO extends GroupColumnIO { public void startField(String field, int index) { try { if (DEBUG) log("startField(" + field + ", " + index + ")"); - currentColumnIO = ((GroupColumnIO)currentColumnIO).getChild(index); + currentColumnIO = ((GroupColumnIO) currentColumnIO).getChild(index); emptyField = true; if (DEBUG) printState(); } catch (RuntimeException e) { @@ -276,11 +320,11 @@ public class MessageColumnIO extends GroupColumnIO { } private void writeNullForMissingFieldsAtCurrentLevel() { - int currentFieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + int currentFieldsCount = ((GroupColumnIO) currentColumnIO).getChildrenCount(); for (int i = 0; i < currentFieldsCount; i++) { if (!fieldsWritten[currentLevel].isWritten(i)) { try { - ColumnIO undefinedField = ((GroupColumnIO)currentColumnIO).getChild(i); + ColumnIO undefinedField = ((GroupColumnIO) currentColumnIO).getChild(i); int d = currentColumnIO.getDefinitionLevel(); if (DEBUG) log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + r[currentLevel] + "," + d + ")"); @@ -294,17 +338,36 @@ public class MessageColumnIO extends GroupColumnIO { private void writeNull(ColumnIO undefinedField, int r, int d) { if (undefinedField.getType().isPrimitive()) { - columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d); + columnWriter[((PrimitiveColumnIO) undefinedField).getId()].writeNull(r, d); } else { - GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField; - writeNullToLeaves(groupColumnIO, r, d); + GroupColumnIO groupColumnIO = (GroupColumnIO) undefinedField; + // only cache the repetition level, the definition level should always be the definition level of the parent node + cacheNullForGroup(groupColumnIO, r); } } - private void writeNullToLeaves(GroupColumnIO group, int r, int d) { - for(ColumnWriter leafWriter: groupToLeafWriter.get(group)) { - leafWriter.writeNull(r,d); + private void cacheNullForGroup(GroupColumnIO group, int r) { + IntArrayList nulls = groupNullCache.get(group); + if (nulls == null) { + nulls = new IntArrayList(); + groupNullCache.put(group, nulls); } + nulls.add(r); + } + + private void writeNullToLeaves(GroupColumnIO group) { + IntArrayList nullCache = groupNullCache.get(group); + if (nullCache == null || nullCache.isEmpty()) + return; + + int parentDefinitionLevel = group.getParent().getDefinitionLevel(); + for (ColumnWriter leafWriter : groupToLeafWriter.get(group)) { + for (IntIterator iter = nullCache.iterator(); iter.hasNext();) { + int repetitionLevel = iter.nextInt(); + leafWriter.writeNull(repetitionLevel, parentDefinitionLevel); + } + } + nullCache.clear(); } private void setRepetitionLevel() { @@ -315,28 +378,52 @@ public class MessageColumnIO extends GroupColumnIO { @Override public void startGroup() { if (DEBUG) log("startGroup()"); + GroupColumnIO group = (GroupColumnIO) currentColumnIO; - ++ currentLevel; + // current group is not null, need to flush all the nulls that were cached before + if (hasNullCache(group)) { + flushCachedNulls(group); + } + + ++currentLevel; r[currentLevel] = r[currentLevel - 1]; - int fieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + int fieldsCount = ((GroupColumnIO) currentColumnIO).getChildrenCount(); fieldsWritten[currentLevel].reset(fieldsCount); if (DEBUG) printState(); } + private boolean hasNullCache(GroupColumnIO group) { + IntArrayList nulls = groupNullCache.get(group); + return nulls != null && !nulls.isEmpty(); + } + + + private void flushCachedNulls(GroupColumnIO group) { + //flush children first + for (int i = 0; i < group.getChildrenCount(); i++) { + ColumnIO child = group.getChild(i); + if (child instanceof GroupColumnIO) { + flushCachedNulls((GroupColumnIO) child); + } + } + //then flush itself + writeNullToLeaves(group); + } + @Override public void endGroup() { if (DEBUG) log("endGroup()"); emptyField = false; writeNullForMissingFieldsAtCurrentLevel(); - -- currentLevel; + --currentLevel; setRepetitionLevel(); if (DEBUG) printState(); } private ColumnWriter getColumnWriter() { - return columnWriter[((PrimitiveColumnIO)currentColumnIO).getId()]; + return columnWriter[((PrimitiveColumnIO) currentColumnIO).getId()]; } @Override @@ -399,6 +486,12 @@ public class MessageColumnIO extends GroupColumnIO { if (DEBUG) printState(); } + + //should flush null for all groups + @Override + public void flush() { + flushCachedNulls(MessageColumnIO.this); + } } public RecordConsumer getRecordWriter(ColumnWriteStore columns) { @@ -421,6 +514,6 @@ public class MessageColumnIO extends GroupColumnIO { @Override public MessageType getType() { - return (MessageType)super.getType(); + return (MessageType) super.getType(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java index e1d3ba7..bf4c196 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java @@ -126,6 +126,10 @@ public class ValidatingRecordConsumer extends RecordConsumer { types.pop(); previousField.pop(); } + @Override + public void flush(){ + delegate.flush(); + } private void validate(PrimitiveTypeName p) { Type currentType = types.peek().asGroupType().getType(fields.peek()); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java index 953d87a..e11d763 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java @@ -125,4 +125,11 @@ abstract public class RecordConsumer { */ abstract public void addDouble(double value); + /** + * NoOps by default + * Subclass class can implement its own flushing logic + */ + public void flush() { + } + } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java index 7c7b72c..e7274cc 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java @@ -288,10 +288,12 @@ public class TestColumnIO { ColumnIOFactory columnIOFactory = new ColumnIOFactory(true); ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore); MessageColumnIO columnIO = columnIOFactory.getColumnIO(writtenSchema); - GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), writtenSchema); + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); + GroupWriter groupWriter = new GroupWriter(recordWriter, writtenSchema); for (Group group : groups) { groupWriter.write(group); } + recordWriter.flush(); columns.flush(); } @@ -310,9 +312,12 @@ public class TestColumnIO { { MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema); log(columnIO); - GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); + GroupWriter groupWriter = new GroupWriter(recordWriter, schema); groupWriter.write(r1); groupWriter.write(r2); + + recordWriter.flush(); columns.flush(); log(columns); log("========="); @@ -461,11 +466,13 @@ public class TestColumnIO { log(columnIO); // Write groups. + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); GroupWriter groupWriter = - new GroupWriter(columnIO.getRecordWriter(columns), messageSchema); + new GroupWriter(recordWriter, messageSchema); for (Group group : groups) { groupWriter.write(group); } + recordWriter.flush(); columns.flush(); // Read groups and verify. @@ -508,7 +515,9 @@ public class TestColumnIO { MemPageStore memPageStore = new MemPageStore(1); ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore); MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); - new GroupWriter(columnIO.getRecordWriter(columns), schema).write(r1); + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); + new GroupWriter(recordWriter, schema).write(r1); + recordWriter.flush(); columns.flush(); RecordReader<Void> recordReader = columnIO.getRecordReader(memPageStore, new ExpectationValidatingConverter(expectedEventsForR1, schema)); @@ -584,9 +593,11 @@ public class TestColumnIO { ValidatingColumnWriteStore columns = new ValidatingColumnWriteStore(expected); MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); - GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); + GroupWriter groupWriter = new GroupWriter(recordWriter, schema); groupWriter.write(r1); groupWriter.write(r2); + recordWriter.flush(); columns.validate(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java index 479b138..9fde4b1 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java @@ -21,6 +21,7 @@ package org.apache.parquet.io; import java.util.ArrayList; import java.util.List; +import org.apache.parquet.io.api.RecordConsumer; import org.junit.Test; import org.apache.parquet.column.ParquetProperties.WriterVersion; @@ -259,11 +260,13 @@ public class TestFiltered { MemPageStore memPageStore = new MemPageStore(number * 2); ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0); - GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); + RecordConsumer recordWriter = columnIO.getRecordWriter(columns); + GroupWriter groupWriter = new GroupWriter(recordWriter, schema); for ( int i = 0; i < number; i++ ) { groupWriter.write(r1); groupWriter.write(r2); } + recordWriter.flush(); columns.flush(); return memPageStore; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 21e69b7..c1bd037 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -62,7 +62,6 @@ class InternalParquetRecordReader<T> { private MessageType requestedSchema; private MessageType fileSchema; - private MessageColumnIO columnIO; private int columnCount; private final ReadSupport<T> readSupport; @@ -137,6 +136,7 @@ class InternalParquetRecordReader<T> { BenchmarkCounter.incrementTime(timeSpentReading); if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); + MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); recordReader = columnIO.getRecordReader(pages, recordConverter, filter); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); totalCountLoadedSoFar += pages.getRowCount(); @@ -174,7 +174,6 @@ class InternalParquetRecordReader<T> { this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); this.requestedSchema = readContext.getRequestedSchema(); this.fileSchema = fileSchema; - this.columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); this.file = file; this.columnCount = requestedSchema.getPaths().size(); this.recordConverter = readSupport.prepareForRead( http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 37e8db5..ab9cb3e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -37,6 +37,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; class InternalParquetRecordWriter<T> { @@ -63,6 +64,7 @@ class InternalParquetRecordWriter<T> { private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; + private RecordConsumer recordConsumer; /** @@ -106,7 +108,8 @@ class InternalParquetRecordWriter<T> { pageStore, pageSize); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); + this.recordConsumer = columnIO.getRecordWriter(columnStore); + writeSupport.prepareForWrite(recordConsumer); } public void close() throws IOException, InterruptedException { @@ -154,6 +157,7 @@ class InternalParquetRecordWriter<T> { private void flushRowGroupToStore() throws IOException { + recordConsumer.flush(); LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize())); if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { LOG.warn("Too much memory used: " + columnStore.memUsageString()); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/01fbf81e/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java ---------------------------------------------------------------------- diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java index cc55a3f..f5f3ff1 100644 --- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java +++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java @@ -154,6 +154,7 @@ public class TestParquetReadProtocol { ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType); expected.write(parquetWriteProtocol); + recordWriter.flush(); columns.flush(); ThriftRecordConverter<T> converter = new TBaseRecordConverter<T>(thriftClass, schema, thriftType);
