Miksu82 commented on a change in pull request #895:
URL: https://github.com/apache/parquet-mr/pull/895#discussion_r616693569
##########
File path:
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws
IOException {
LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
}
}
+
+ @Test
+ public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded()
throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(4)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+ assertHasNotFlushed(file);
+ }
+
+ @Test
+ public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws
IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(3)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 3, 1);
+ }
+
+ @Test
+ public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws
IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(4)
+ .withMaxRowCountForPageSizeCheck(2)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasNotFlushed(file);
+ }
+
+ @Test
+ public void testParquetFileIsFlushedAfterEachRecord() throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(1)
+ .withMaxRowCountForPageSizeCheck(4)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 3, 3);
+ }
+
+ @Test
+ public void testParquetFileNotFlushingAllRows() throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ TestOutputFile file = new TestOutputFile();
+ ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+ .withMinRowCountForPageSizeCheck(2)
+ .withMaxRowCountForPageSizeCheck(3)
+ .build();
+
+ writeRecords(writer, schema);
+
+ assertHasFlushed(file, 2, 1);
+ }
+
+ private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType
schema,
+ TestOutputFile
file) throws IOException {
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+ return ExampleParquetWriter.builder(file)
+ .withConf(conf)
+ // Set row group size to 1, to make sure we flush every time
+ // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+ .withRowGroupSize(1);
+ }
+
+ private void writeRecords(ParquetWriter<Group> writer, MessageType schema)
throws IOException {
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ writer.write(factory.newGroup().append("str", "foo"));
+ writer.write(factory.newGroup().append("str", "bar"));
+ writer.write(factory.newGroup().append("str", "baz"));
+ }
+
+ private void assertHasNotFlushed(TestOutputFile file) {
+ int emptyFileLength = ParquetFileWriter.MAGIC.length;
+ assertEquals(emptyFileLength, file.stats.getBytesWritten());
+ }
+
+ private void assertHasFlushed(TestOutputFile file, int numWrites, int
numFlushes) {
Review comment:
Seems really hard to assert that the properties are just set. See my
comment below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]