rdblue commented on a change in pull request #3902:
URL: https://github.com/apache/iceberg/pull/3902#discussion_r786306286



##########
File path: 
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
##########
@@ -86,4 +117,141 @@ protected void writeAndValidate(Schema schema) throws 
IOException {
         RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, 
NUM_RECORDS / 20)),
         schema);
   }
+
+  @Test
+  public void testParquetProperties() throws Exception {
+    final MessageType schemaSimple =
+        MessageTypeParser.parseMessageType(
+            "message m {" +
+                "  optional int32 id = 1;" +
+                "  optional binary data (STRING) = 2;" +
+                "}");
+
+    final ColumnDescriptor colADesc = schemaSimple.getColumns().get(0);
+    final ColumnDescriptor colBDesc = schemaSimple.getColumns().get(1);
+
+    List<ColumnDescriptor> columnDescriptors = Arrays.asList(colADesc, 
colBDesc);
+
+    int expectedRowCount = 100000;
+    List<RowData> rows = Lists.newArrayListWithCapacity(expectedRowCount);
+    for (int i = 0; i < expectedRowCount; i++) {
+      rows.add(SimpleDataUtil.createRowData(1, 
UUID.randomUUID().toString().substring(0, 10)));
+    }
+
+    String location = temp.getRoot().getAbsolutePath();
+
+    ImmutableMap<String, String> properties = ImmutableMap.of(
+        TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, 
String.valueOf(expectedRowCount * 10),
+        TableProperties.PARQUET_PAGE_SIZE_BYTES, 
String.valueOf(expectedRowCount),
+        TableProperties.PARQUET_DICT_SIZE_BYTES, 
String.valueOf(expectedRowCount),
+        TableProperties.PARQUET_COMPRESSION, "uncompressed");
+
+    Table table = SimpleDataUtil.createTable(location, properties, false);
+
+    writeAndCommit(table, ImmutableList.of(), false, rows);
+    table.refresh();
+
+    CloseableIterator<DataFile> iterator =
+        FindFiles.in(table).collect().iterator();
+
+    Assert.assertTrue(iterator.hasNext());
+
+    DataFile dataFile = iterator.next();
+    Path path = new Path((String) dataFile.path());
+
+    Configuration configuration = new Configuration();
+    ParquetMetadata footer = ParquetFileReader.readFooter(configuration, path);
+    ParquetFileReader parquetFileReader = new ParquetFileReader(
+        configuration, footer.getFileMetaData(), path, footer.getBlocks(), 
columnDescriptors);
+    Assert.assertEquals("Should have to 2 block!", 2, 
footer.getBlocks().size());
+
+    int colAPageANum = 0;
+    int colBPageNum = 0;
+    long rowCount = 0;
+    PageReadStore pageReadStore;
+    while ((pageReadStore = parquetFileReader.readNextRowGroup()) != null) {
+
+      rowCount += pageReadStore.getRowCount();
+
+      DictionaryPage colADictionaryPage = readDictionaryPage(colADesc, 
pageReadStore);
+      String colAEncodingName = colADictionaryPage.getEncoding().name();
+      Assert.assertEquals("PLAIN_DICTIONARY", colAEncodingName);
+      Assert.assertTrue(
+          "The size of dictionary page should be smaller than " + 
expectedRowCount,
+          colADictionaryPage.getUncompressedSize() <= expectedRowCount);
+
+      DictionaryPage colBDictionaryPage = readDictionaryPage(colADesc, 
pageReadStore);
+      String colBEncodingName = colBDictionaryPage.getEncoding().name();
+      Assert.assertEquals("PLAIN_DICTIONARY", colBEncodingName);
+      Assert.assertTrue(
+          "The size of dictionary page should be smaller than: " + 
expectedRowCount,
+          colBDictionaryPage.getUncompressedSize() <= expectedRowCount);
+
+      DataPageV1 colADataPage;
+      while ((colADataPage = readNextPage(colADesc, pageReadStore)) != null) {
+        Assert.assertTrue(
+            "The size of each page should be smaller than: " + 
expectedRowCount,
+            colADataPage.getUncompressedSize() <= expectedRowCount);
+        colAPageANum++;
+      }
+      DataPageV1 colBDataPage;
+      while ((colBDataPage = readNextPage(colBDesc, pageReadStore)) != null) {
+        Assert.assertTrue(
+            "The size of each page should be smaller than " + expectedRowCount,
+            colBDataPage.getUncompressedSize() <= expectedRowCount);
+        colBPageNum++;
+      }
+    }
+
+    Assert.assertEquals("should have 6 pages.", 6, colAPageANum);
+    Assert.assertEquals("should have 16 pages.", 16, colBPageNum);
+    Assert.assertEquals(expectedRowCount, rowCount);
+  }
+
+  private void writeAndCommit(Table table, List<Integer> eqFieldIds, boolean 
upsert, List<RowData> rows)
+      throws IOException {
+    TaskWriter<RowData> writer = createTaskWriter(table);

Review comment:
       The file factory used by `createTaskWriter` doesn't appear to be based 
on `table`, so I'm skeptical that this is testing what you intend.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to