Miksu82 commented on a change in pull request #895:
URL: https://github.com/apache/parquet-mr/pull/895#discussion_r616653104
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws
IOException {
LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
}
}
+
+ @Test
+ public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded()
throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(4)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+ assertHasNotFlushed(file);
+ }
+
+ @Test
+ public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws
IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(3)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 3, 1);
+ }
+
+ @Test
+ public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws
IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(4)
+ .withMaxRowCountForPageSizeCheck(2)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasNotFlushed(file);
+ }
+
+ @Test
+ public void testParquetFileIsFlushedAfterEachRecord() throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(1)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 3, 3);
+ }
+
+ @Test
+ public void testParquetFileNotFlushingAllRows() throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(2)
+ .withMaxRowCountForPageSizeCheck(3)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 2, 1);
+ }
+
+ private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType
schema,
+ TestOutputFile
file) throws IOException {
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+ return ExampleParquetWriter.builder(file)
+ .withConf(conf)
+ // Set row group size to 1, to make sure we flush every time
+ // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+ .withRowGroupSize(1);
+ }
+
+ private void writeRecords(ParquetWriter<Group> writer, MessageType schema)
throws IOException {
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ writer.write(factory.newGroup().append("str", "foo"));
+ writer.write(factory.newGroup().append("str", "bar"));
+ writer.write(factory.newGroup().append("str", "baz"));
+ }
+
+ private void assertHasNotFlushed(TestOutputFile file) {
+ int emptyFileLength = ParquetFileWriter.MAGIC.length;
+ assertEquals(emptyFileLength, file.stats.getBytesWritten());
+ }
+
+ private void assertHasFlushed(TestOutputFile file, int numWrites, int
numFlushes) {
Review comment:
Hmm..no. Everytime a record is flushed it writes the actual record + the
metadata to indicate that a row group has ended (the writer is created by
defining `rowGroupSize=1`).
It works like this
```
write("foo") // writes 7B
flush() // writes 23B
write("bar") // writes 7B
flush() // writes 23B
// Totally has written: 60B
```
but
```
write("foo") // writes 7B
write("bar") // writes 7B
flush() // writes 23B
// Totally has written: 37
```
And by defining the `page.size.row.check.min` and `page.size.row.check.max`
I can control how often the `InternalParquetRecordWriter` flushes the records,
and I can assert that by calculating the number of bytes written in total.
But you are absolutely right, that this is complicated and in the wrong
place so I'll remove these test and just assert that the properties have been
set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]