Miksu82 commented on a change in pull request #895:
URL: https://github.com/apache/parquet-mr/pull/895#discussion_r616653104



##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws 
IOException {
         
LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
     }
   }
+
+  @Test
+  public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() 
throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws 
IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(3)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 1);
+  }
+
+  @Test
+  public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws 
IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(2)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedAfterEachRecord() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(1)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 3);
+  }
+
+  @Test
+  public void testParquetFileNotFlushingAllRows() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(2)
+      .withMaxRowCountForPageSizeCheck(3)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 2, 1);
+  }
+
+  private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType 
schema,
+                                                               TestOutputFile 
file) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+    return ExampleParquetWriter.builder(file)
+      .withConf(conf)
+      // Set row group size to 1, to make sure we flush every time
+      // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+      .withRowGroupSize(1);
+  }
+
+  private void writeRecords(ParquetWriter<Group> writer, MessageType schema) 
throws IOException {
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    writer.write(factory.newGroup().append("str", "foo"));
+    writer.write(factory.newGroup().append("str", "bar"));
+    writer.write(factory.newGroup().append("str", "baz"));
+  }
+
+  private void assertHasNotFlushed(TestOutputFile file) {
+    int emptyFileLength = ParquetFileWriter.MAGIC.length;
+    assertEquals(emptyFileLength, file.stats.getBytesWritten());
+  }
+
+  private void assertHasFlushed(TestOutputFile file, int numWrites, int 
numFlushes) {

Review comment:
       Hmm..no. Everytime a record is flushed it writes the actual record + the 
metadata to indicate that a row group has ended (the writer is created by 
defining `rowGroupSize=1`).
   
   It works like this
   
   ```
   write("foo") // writes 7B
   flush() // writes 23B
   write("bar") // writes 7B
   flush() // writes 23B
   
   // Totally has written:  60B
   ```
   
   but
   
   ```
   write("foo") // writes 7B
   write("bar") // writes 7B
   flush()  // writes 23B
   
   // Totally has written: 37
   ```
   
   And by defining the `page.size.row.check.min` and `page.size.row.check.max` 
I can control how often the `InternalParquetRecordWriter` flushes the records, 
and I can assert that by calculating the number of bytes written in total.
   
   But you are absolutely right, that this is complicated and in the wrong 
place so I'll remove these test and just assert that the properties have been 
set.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to