This is an automated email from the ASF dual-hosted git repository. gabor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 892dedb PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (#620) 892dedb is described below commit 892dedb23591bb4e38a061d5ea607637fd4e210f Author: Gabor Szadovszky <ga...@apache.org> AuthorDate: Wed Mar 13 08:25:56 2019 +0100 PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (#620) --- .../benchmarks/NestedNullWritingBenchmarks.java | 151 +++++++++++++++++++++ .../apache/parquet/column/ColumnWriteStore.java | 12 +- .../parquet/column/impl/ColumnWriteStoreBase.java | 5 + .../parquet/column/impl/ColumnWriterBase.java | 3 + .../org/apache/parquet/io/MessageColumnIO.java | 7 + .../hadoop/example/ExampleParquetWriter.java | 16 +++ .../apache/parquet/hadoop/TestParquetWriter.java | 43 +++++- 7 files changed, 235 insertions(+), 2 deletions(-) diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java new file mode 100644 index 0000000..324775b --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/NestedNullWritingBenchmarks.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.openjdk.jmh.annotations.Mode.SingleShotTime; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import java.io.IOException; +import java.util.Random; + +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter.Mode; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmark to measure writing nested null values. (See PARQUET-343 for details.) + * <p> + * To execute this benchmark a jar file shall be created of this module. Then the jar file can be executed using the JMH + * framework.<br> + * The following one-liner (shall be executed in the parquet-benchmarks submodule) generates result statistics in the + * file {@code jmh-result.json}. This json might be visualized by using the tool at + * <a href="https://jmh.morethan.io">https://jmh.morethan.io</a>. + * + * <pre> + * mvn clean package && java -jar target/parquet-benchmarks.jar org.apache.parquet.benchmarks.NestedNullWritingBenchmarks -rf json + * </pre> + */ +@BenchmarkMode(SingleShotTime) +@Fork(1) +@Warmup(iterations = 10, batchSize = 1) +@Measurement(iterations = 50, batchSize = 1) +@OutputTimeUnit(MILLISECONDS) +@State(Benchmark) +public class NestedNullWritingBenchmarks { + private static final MessageType SCHEMA = Types.buildMessage() + .optionalList() + .optionalElement(INT32) + .named("int_list") + .optionalList() + .optionalListElement() + .optionalElement(BINARY) + .named("dummy_list") + .optionalMap() + .key(BINARY) + .value(BINARY, OPTIONAL) + .named("dummy_map") + .optionalGroup() + .optional(BINARY).named("dummy_group_value1") + .optional(BINARY).named("dummy_group_value2") + .optional(BINARY).named("dummy_group_value3") + .named("dummy_group") + .named("msg"); + private static final int RECORD_COUNT = 10_000_000; + private static final double NULL_RATIO = 0.99; + private static final OutputFile BLACK_HOLE = new OutputFile() { + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + }; + } + }; + + private static class ValueGenerator { + private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA); + private static final Group NULL = FACTORY.newGroup(); + private final Random random = new Random(42); + + public Group nextValue() { + if (random.nextDouble() > NULL_RATIO) { + Group group = FACTORY.newGroup(); + group.addGroup("int_list").addGroup("list").append("element", random.nextInt()); + return group; + } else { + return NULL; + } + } + } + + @Benchmark + public void benchmarkWriting() throws IOException { + ValueGenerator generator = new ValueGenerator(); + try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(BLACK_HOLE) + .withWriteMode(Mode.OVERWRITE) + .withType(SCHEMA) + .build()) { + for (int i = 0; i < RECORD_COUNT; ++i) { + writer.write(generator.nextValue()); + } + } + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java index e14c7dc..db5320a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java @@ -52,7 +52,7 @@ public interface ColumnWriteStore { abstract public long getBufferedSize(); /** - * used for debugging pupose + * used for debugging purpose * @return a formated string representing memory usage per column */ abstract public String memUsageString(); @@ -62,4 +62,14 @@ public interface ColumnWriteStore { */ abstract public void close(); + /** + * Returns whether flushing the possibly cached values (or nulls) to the underlying column writers is necessary, + * because the pages might be closed after the next invocation of {@link #endRecord()}. + * + * @return {@code true} if all the values shall be written to the underlying column writers before calling + * {@link #endRecord()} + */ + default boolean isColumnFlushNeeded() { + return false; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index ac9aaca..2670c31 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -229,4 +229,9 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore { rowCountForNextSizeCheck = rowCountForNextRowCountCheck; } } + + @Override + public boolean isColumnFlushNeeded() { + return rowCount + 1 >= rowCountForNextSizeCheck; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 3788c82..8fc7d31 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -305,6 +305,9 @@ abstract class ColumnWriterBase implements ColumnWriter { * Writes the current data to a new page in the page store */ void writePage() { + if (valueCount == 0) { + throw new ParquetEncodingException("writing empty page"); + } this.rowsWrittenSoFar += pageRowCount; if (DEBUG) LOG.debug("write page"); diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java index f1da363..8fc4f91 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -297,6 +297,13 @@ public class MessageColumnIO extends GroupColumnIO { @Override public void endMessage() { writeNullForMissingFieldsAtCurrentLevel(); + + // We need to flush the cached null values before ending the record to ensure that everything is sent to the + // writer before the current page would be closed + if (columns.isColumnFlushNeeded()) { + flush(); + } + columns.endRecord(); if (DEBUG) log("< MESSAGE END >"); if (DEBUG) printState(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 88879c2..12a67d3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -25,6 +25,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.HashMap; @@ -48,6 +49,17 @@ public class ExampleParquetWriter extends ParquetWriter<Group> { } /** + * Creates a Builder for configuring ParquetWriter with the example object + * model. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE. + * + * @param file the output file to create + * @return a {@link Builder} to create a {@link ParquetWriter} + */ + public static Builder builder(OutputFile file) { + return new Builder(file); + } + + /** * Create a new {@link ExampleParquetWriter}. * * @param file The file name to write to. @@ -78,6 +90,10 @@ public class ExampleParquetWriter extends ParquetWriter<Group> { super(file); } + private Builder(OutputFile file) { + super(file); + } + public Builder withType(MessageType type) { this.type = type; return this; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 6fc3c72..25c9608 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -21,6 +21,7 @@ package org.apache.parquet.hadoop; import static java.util.Arrays.asList; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; @@ -32,20 +33,24 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.example.ExampleInputFormat; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.junit.Assert; import org.junit.Rule; @@ -54,6 +59,7 @@ import org.junit.Test; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -166,4 +172,39 @@ public class TestParquetWriter { Assert.assertFalse("Should not create a file when schema is rejected", file.exists()); } + + // Testing the issue of PARQUET-1531 where writing null nested rows leads to empty pages if the page row count limit + // is reached. + @Test + public void testNullValuesWithPageRowLimit() throws IOException { + MessageType schema = Types.buildMessage().optionalList().optionalElement(BINARY).as(stringType()).named("str_list") + .named("msg"); + final int recordCount = 100; + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Group listNull = factory.newGroup(); + + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .build()) { + for (int i = 0; i < recordCount; ++i) { + writer.write(listNull); + } + } + + try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + int readRecordCount = 0; + for (Group group = reader.read(); group != null; group = reader.read()) { + assertEquals(listNull.toString(), group.toString()); + ++readRecordCount; + } + assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); + } + } }